Coverage for tests / test_parquet.py: 17%

1347 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:37 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Tests for ParquetFormatter. 

29 

30Tests in this module are disabled unless pandas and pyarrow are importable. 

31""" 

32 

33import datetime 

34import os 

35import posixpath 

36import shutil 

37import unittest 

38import uuid 

39 

40try: 

41 import pyarrow as pa 

42except ImportError: 

43 pa = None 

44try: 

45 import astropy.table as atable 

46 from astropy import units 

47except ImportError: 

48 atable = None 

49try: 

50 import numpy as np 

51except ImportError: 

52 np = None 

53try: 

54 import pandas as pd 

55except ImportError: 

56 pd = None 

57 

58try: 

59 import boto3 

60 import botocore 

61 

62 from lsst.resources.s3utils import clean_test_environment_for_s3 

63 

64 try: 

65 from moto import mock_aws # v5 

66 except ImportError: 

67 from moto import mock_s3 as mock_aws 

68except ImportError: 

69 boto3 = None 

70 

71try: 

72 import fsspec 

73except ImportError: 

74 fsspec = None 

75 

76try: 

77 import s3fs 

78except ImportError: 

79 s3fs = None 

80 

81 

82from lsst.daf.butler import ( 

83 Butler, 

84 Config, 

85 DatasetProvenance, 

86 DatasetRef, 

87 DatasetType, 

88 FileDataset, 

89 StorageClassConfig, 

90 StorageClassFactory, 

91) 

92from lsst.resources import ResourcePath 

93 

94try: 

95 from lsst.daf.butler.delegates.arrowtable import ArrowTableDelegate 

96except ImportError: 

97 pa = None 

98 

99try: 

100 from lsst.daf.butler.formatters.parquet import ( 

101 ASTROPY_PANDAS_INDEX_KEY, 

102 ArrowAstropySchema, 

103 ArrowNumpySchema, 

104 DataFrameSchema, 

105 ParquetFormatter, 

106 _append_numpy_multidim_metadata, 

107 _astropy_to_numpy_dict, 

108 _numpy_dict_to_numpy, 

109 _numpy_dtype_to_arrow_types, 

110 _numpy_style_arrays_to_arrow_arrays, 

111 _numpy_to_numpy_dict, 

112 add_pandas_index_to_astropy, 

113 arrow_to_astropy, 

114 arrow_to_numpy, 

115 arrow_to_numpy_dict, 

116 arrow_to_pandas, 

117 astropy_to_arrow, 

118 astropy_to_pandas, 

119 compute_row_group_size, 

120 numpy_dict_to_arrow, 

121 numpy_to_arrow, 

122 pandas_to_arrow, 

123 pandas_to_astropy, 

124 ) 

125except ImportError: 

126 pa = None 

127 pd = None 

128 atable = None 

129 np = None 

130from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir 

131 

132TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

133 

134 

135def _makeSimpleNumpyTable(include_multidim=False, include_bigendian=False): 

136 """Make a simple numpy table with random data. 

137 

138 Parameters 

139 ---------- 

140 include_multidim : `bool` 

141 Include multi-dimensional columns. 

142 include_bigendian : `bool` 

143 Include big-endian columns. 

144 

145 Returns 

146 ------- 

147 numpyTable : `numpy.ndarray` 

148 """ 

149 nrow = 5 

150 

151 dtype = [ 

152 ("index", "i4"), 

153 ("a", "f8"), 

154 ("b", "f8"), 

155 ("c", "f8"), 

156 ("ddd", "f8"), 

157 ("f", "i8"), 

158 ("strcol", "U10"), 

159 ("bytecol", "S10"), 

160 ("dtn", "datetime64[ns]"), 

161 ("dtu", "datetime64[us]"), 

162 ] 

163 

164 if include_multidim: 

165 dtype.extend( 

166 [ 

167 ("d1", "f4", (5,)), 

168 ("d2", "i8", (5, 10)), 

169 ("d3", "f8", (5, 10)), 

170 ] 

171 ) 

172 

173 if include_bigendian: 

174 dtype.extend([("a_bigendian", ">f8"), ("f_bigendian", ">i8")]) 

175 

176 data = np.zeros(nrow, dtype=dtype) 

177 data["index"][:] = np.arange(nrow) 

178 data["a"] = np.random.randn(nrow) 

179 data["b"] = np.random.randn(nrow) 

180 data["c"] = np.random.randn(nrow) 

181 data["ddd"] = np.random.randn(nrow) 

182 data["f"] = np.arange(nrow) * 10 

183 data["strcol"][:] = "teststring" 

184 data["bytecol"][:] = "teststring" 

185 data["dtn"] = datetime.datetime.fromisoformat("2024-07-23") 

186 data["dtu"] = datetime.datetime.fromisoformat("2024-07-23") 

187 

188 if include_multidim: 

189 data["d1"] = np.random.randn(data["d1"].size).reshape(data["d1"].shape) 

190 data["d2"] = np.arange(data["d2"].size).reshape(data["d2"].shape) 

191 data["d3"] = np.asfortranarray(np.random.randn(data["d3"].size).reshape(data["d3"].shape)) 

192 

193 if include_bigendian: 

194 data["a_bigendian"][:] = data["a"] 

195 data["f_bigendian"][:] = data["f"] 

196 

197 return data 

198 

199 

200def _makeSingleIndexDataFrame(include_masked=False, include_lists=False): 

201 """Make a single index data frame for testing. 

202 

203 Parameters 

204 ---------- 

205 include_masked : `bool` 

206 Include masked columns. 

207 include_lists : `bool` 

208 Include list columns. 

209 

210 Returns 

211 ------- 

212 dataFrame : `~pandas.DataFrame` 

213 The test dataframe. 

214 allColumns : `list` [`str`] 

215 List of all the columns (including index columns). 

216 """ 

217 data = _makeSimpleNumpyTable() 

218 df = pd.DataFrame(data) 

219 df = df.set_index("index") 

220 

221 if include_masked: 

222 nrow = len(df) 

223 

224 df["m1"] = pd.array(np.arange(nrow), dtype=pd.Int64Dtype()) 

225 df["m2"] = pd.array(np.arange(nrow), dtype=np.float32) 

226 df["mstrcol"] = pd.array(np.array(["text"] * nrow)) 

227 df.loc[1, ["m1", "m2", "mstrcol"]] = None 

228 df.loc[0, "m1"] = 1649900760361600113 

229 

230 if include_lists: 

231 nrow = len(df) 

232 

233 df["l1"] = [[0, 0]] * nrow 

234 df["l2"] = [[0.0, 0.0]] * nrow 

235 df["l3"] = [[]] * nrow 

236 

237 allColumns = df.columns.append(pd.Index(df.index.names)) 

238 

239 return df, allColumns 

240 

241 

242def _makeMultiIndexDataFrame(): 

243 """Make a multi-index data frame for testing. 

244 

245 Returns 

246 ------- 

247 dataFrame : `~pandas.DataFrame` 

248 The test dataframe. 

249 """ 

250 columns = pd.MultiIndex.from_tuples( 

251 [ 

252 ("g", "a"), 

253 ("g", "b"), 

254 ("g", "c"), 

255 ("r", "a"), 

256 ("r", "b"), 

257 ("r", "c"), 

258 ], 

259 names=["filter", "column"], 

260 ) 

261 df = pd.DataFrame(np.random.randn(5, 6), index=np.arange(5, dtype=int), columns=columns) 

262 

263 return df 

264 

265 

266def _makeSimpleAstropyTable(include_multidim=False, include_masked=False, include_bigendian=False): 

267 """Make an astropy table for testing. 

268 

269 Parameters 

270 ---------- 

271 include_multidim : `bool` 

272 Include multi-dimensional columns. 

273 include_masked : `bool` 

274 Include masked columns. 

275 include_bigendian : `bool` 

276 Include big-endian columns. 

277 

278 Returns 

279 ------- 

280 astropyTable : `astropy.table.Table` 

281 The test table. 

282 """ 

283 data = _makeSimpleNumpyTable(include_multidim=include_multidim, include_bigendian=include_bigendian) 

284 # Add a couple of units. 

285 table = atable.Table(data) 

286 table["a"].unit = units.degree 

287 table["a"].description = "Description of column a" 

288 table["b"].unit = units.meter 

289 table["b"].description = "Description of column b" 

290 

291 # Add some masked columns. 

292 if include_masked: 

293 nrow = len(table) 

294 mask = np.zeros(nrow, dtype=bool) 

295 mask[1] = True 

296 # We set the masked columns with the underlying sentinel value 

297 # to be able test after serialization. 

298 

299 # Masked 64-bit integer. 

300 arr = np.arange(nrow, dtype="i8") 

301 arr[mask] = -1 

302 arr[0] = 1649900760361600113 

303 table["m_i8"] = np.ma.masked_array(data=arr, mask=mask, fill_value=-1) 

304 # Masked 32-bit float. 

305 arr = np.arange(nrow, dtype="f4") 

306 arr[mask] = np.nan 

307 table["m_f4"] = np.ma.masked_array(data=arr, mask=mask, fill_value=np.nan) 

308 # Unmasked 32-bit float with NaNs. 

309 table["um_f4"] = arr 

310 # Masked 64-bit float. 

311 arr = np.arange(nrow, dtype="f8") 

312 arr[mask] = np.nan 

313 table["m_f8"] = np.ma.masked_array(data=arr, mask=mask, fill_value=np.nan) 

314 # Unmasked 64-bit float with NaNs. 

315 table["um_f8"] = arr 

316 # Masked boolean. 

317 arr = np.zeros(nrow, dtype=np.bool_) 

318 arr[mask] = True 

319 table["m_bool"] = np.ma.masked_array(data=arr, mask=mask, fill_value=True) 

320 # Masked unsigned 32-bit unsigned int. 

321 arr = np.arange(nrow, dtype="u4") 

322 arr[mask] = 0 

323 table["m_u4"] = np.ma.masked_array(data=arr, mask=mask, fill_value=0) 

324 # Masked string. 

325 table["m_str"] = np.ma.masked_array(data=np.array(["text"] * nrow), mask=mask, fill_value="") 

326 # Masked bytes. 

327 table["m_byte"] = np.ma.masked_array(data=np.array([b"bytes"] * nrow), mask=mask, fill_value=b"") 

328 

329 return table 

330 

331 

332def _makeSimpleArrowTable(include_multidim=False, include_masked=False): 

333 """Make an arrow table for testing. 

334 

335 Parameters 

336 ---------- 

337 include_multidim : `bool` 

338 Include multi-dimensional columns. 

339 include_masked : `bool` 

340 Include masked columns. 

341 

342 Returns 

343 ------- 

344 arrowTable : `pyarrow.Table` 

345 The test table. 

346 """ 

347 data = _makeSimpleAstropyTable(include_multidim=include_multidim, include_masked=include_masked) 

348 return astropy_to_arrow(data) 

349 

350 

351@unittest.skipUnless(pd is not None, "Cannot test ParquetFormatterDataFrame without pandas.") 

352@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterDataFrame without pyarrow.") 

353class ParquetFormatterDataFrameTestCase(unittest.TestCase): 

354 """Tests for ParquetFormatter, DataFrame, using local file datastore.""" 

355 

356 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

357 

358 def setUp(self): 

359 """Create a new butler root for each test.""" 

360 self.root = makeTestTempDir(TESTDIR) 

361 config = Config(self.configFile) 

362 self.run = "test_run" 

363 self.butler = Butler.from_config( 

364 Butler.makeRepo(self.root, config=config), writeable=True, run=self.run 

365 ) 

366 self.enterContext(self.butler) 

367 # No dimensions in dataset type so we don't have to worry about 

368 # inserting dimension data or defining data IDs. 

369 self.datasetType = DatasetType( 

370 "data", dimensions=(), storageClass="DataFrame", universe=self.butler.dimensions 

371 ) 

372 self.butler.registry.registerDatasetType(self.datasetType) 

373 

374 def tearDown(self): 

375 removeTestTempDir(self.root) 

376 

377 def testSingleIndexDataFrame(self): 

378 df1, allColumns = _makeSingleIndexDataFrame(include_masked=True) 

379 

380 self.butler.put(df1, self.datasetType, dataId={}) 

381 # Read the whole DataFrame. 

382 df2 = self.butler.get(self.datasetType, dataId={}) 

383 self.assertTrue(df1.equals(df2)) 

384 # Read just the column descriptions. 

385 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

386 self.assertTrue(allColumns.equals(columns2)) 

387 # Read the rowcount. 

388 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

389 self.assertEqual(rowcount, len(df1)) 

390 # Read the schema. 

391 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

392 self.assertEqual(schema, DataFrameSchema(df1)) 

393 # Read just some columns a few different ways. 

394 df3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]}) 

395 self.assertTrue(df1.loc[:, ["a", "c"]].equals(df3)) 

396 df4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"}) 

397 self.assertTrue(df1.loc[:, ["a"]].equals(df4)) 

398 df5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]}) 

399 self.assertTrue(df1.loc[:, ["a"]].equals(df5)) 

400 df6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"}) 

401 self.assertTrue(df1.loc[:, ["ddd"]].equals(df6)) 

402 df7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]}) 

403 self.assertTrue(df1.loc[:, ["a"]].equals(df7)) 

404 df8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d*"]}) 

405 self.assertTrue(df1.loc[:, ["ddd", "dtn", "dtu"]].equals(df8)) 

406 df9 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d*", "d*"]}) 

407 self.assertTrue(df1.loc[:, ["ddd", "dtn", "dtu"]].equals(df9)) 

408 # Passing an unrecognized column should be a ValueError. 

409 with self.assertRaises(ValueError): 

410 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]}) 

411 

412 def testSingleIndexDataFrameWithLists(self): 

413 df1, allColumns = _makeSingleIndexDataFrame(include_lists=True) 

414 

415 self.butler.put(df1, self.datasetType, dataId={}) 

416 # Read the whole DataFrame. 

417 df2 = self.butler.get(self.datasetType, dataId={}) 

418 

419 # We need to check the list columns specially because they go 

420 # from lists to arrays. 

421 for col in ["l1", "l2", "l3"]: 

422 for i in range(len(df1)): 

423 self.assertTrue(np.all(df2[col].values[i] == df1[col].values[i])) 

424 

425 def testMultiIndexDataFrame(self): 

426 df1 = _makeMultiIndexDataFrame() 

427 

428 self.butler.put(df1, self.datasetType, dataId={}) 

429 # Read the whole DataFrame. 

430 df2 = self.butler.get(self.datasetType, dataId={}) 

431 self.assertTrue(df1.equals(df2)) 

432 # Read just the column descriptions. 

433 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

434 self.assertTrue(df1.columns.equals(columns2)) 

435 self.assertEqual(columns2.names, df1.columns.names) 

436 # Read the rowcount. 

437 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

438 self.assertEqual(rowcount, len(df1)) 

439 # Read the schema. 

440 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

441 self.assertEqual(schema, DataFrameSchema(df1)) 

442 # Read just some columns a few different ways. 

443 df3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": {"filter": "g"}}) 

444 self.assertTrue(df1.loc[:, ["g"]].equals(df3)) 

445 df4 = self.butler.get( 

446 self.datasetType, dataId={}, parameters={"columns": {"filter": ["r"], "column": "a"}} 

447 ) 

448 self.assertTrue(df1.loc[:, [("r", "a")]].equals(df4)) 

449 column_list = [("g", "a"), ("r", "c")] 

450 df5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": column_list}) 

451 self.assertTrue(df1.loc[:, column_list].equals(df5)) 

452 column_dict = {"filter": "r", "column": ["a", "b"]} 

453 df6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": column_dict}) 

454 self.assertTrue(df1.loc[:, [("r", "a"), ("r", "b")]].equals(df6)) 

455 # Passing an unrecognized column should be a ValueError. 

456 with self.assertRaises(ValueError): 

457 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d"]}) 

458 

459 def testSingleIndexDataFrameEmptyString(self): 

460 """Test persisting a single index dataframe with empty strings.""" 

461 df1, _ = _makeSingleIndexDataFrame() 

462 

463 # Set one of the strings to None 

464 df1.at[1, "strcol"] = None 

465 

466 self.butler.put(df1, self.datasetType, dataId={}) 

467 # Read the whole DataFrame. 

468 df2 = self.butler.get(self.datasetType, dataId={}) 

469 self.assertTrue(df1.equals(df2)) 

470 

471 def testSingleIndexDataFrameAllEmptyStrings(self): 

472 """Test persisting a single index dataframe with an empty string 

473 column. 

474 """ 

475 df1, _ = _makeSingleIndexDataFrame() 

476 

477 # Set all of the strings to None 

478 df1.loc[0:, "strcol"] = None 

479 

480 self.butler.put(df1, self.datasetType, dataId={}) 

481 # Read the whole DataFrame. 

482 df2 = self.butler.get(self.datasetType, dataId={}) 

483 self.assertTrue(df1.equals(df2)) 

484 

485 def testLegacyDataFrame(self): 

486 """Test writing a dataframe to parquet via pandas (without additional 

487 metadata) and ensure that we can read it back with all the new 

488 functionality. 

489 """ 

490 df1, allColumns = _makeSingleIndexDataFrame() 

491 

492 fname = os.path.join(self.root, "test_dataframe.parq") 

493 df1.to_parquet(fname) 

494 

495 legacy_type = DatasetType( 

496 "legacy_dataframe", 

497 dimensions=(), 

498 storageClass="DataFrame", 

499 universe=self.butler.dimensions, 

500 ) 

501 self.butler.registry.registerDatasetType(legacy_type) 

502 

503 data_id = {} 

504 ref = DatasetRef(legacy_type, data_id, run=self.run) 

505 dataset = FileDataset(path=fname, refs=[ref], formatter=ParquetFormatter) 

506 

507 self.butler.ingest(dataset, transfer="copy") 

508 

509 self.butler.put(df1, self.datasetType, dataId={}) 

510 

511 df2a = self.butler.get(self.datasetType, dataId={}) 

512 df2b = self.butler.get("legacy_dataframe", dataId={}) 

513 self.assertTrue(df2a.equals(df2b)) 

514 

515 df3a = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a"]}) 

516 df3b = self.butler.get("legacy_dataframe", dataId={}, parameters={"columns": ["a"]}) 

517 self.assertTrue(df3a.equals(df3b)) 

518 

519 columns2a = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

520 columns2b = self.butler.get("legacy_dataframe.columns", dataId={}) 

521 self.assertTrue(columns2a.equals(columns2b)) 

522 

523 rowcount2a = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

524 rowcount2b = self.butler.get("legacy_dataframe.rowcount", dataId={}) 

525 self.assertEqual(rowcount2a, rowcount2b) 

526 

527 schema2a = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

528 schema2b = self.butler.get("legacy_dataframe.schema", dataId={}) 

529 self.assertEqual(schema2a, schema2b) 

530 

531 def testDataFrameSchema(self): 

532 tab1 = _makeSimpleArrowTable() 

533 

534 schema = DataFrameSchema.from_arrow(tab1.schema) 

535 

536 self.assertIsInstance(schema.schema, pd.DataFrame) 

537 self.assertEqual(repr(schema), repr(schema._schema)) 

538 self.assertNotEqual(schema, "not_a_schema") 

539 self.assertEqual(schema, schema) 

540 

541 tab2 = _makeMultiIndexDataFrame() 

542 schema2 = DataFrameSchema(tab2) 

543 

544 self.assertNotEqual(schema, schema2) 

545 

546 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

547 def testWriteSingleIndexDataFrameReadAsAstropyTable(self): 

548 df1, allColumns = _makeSingleIndexDataFrame() 

549 

550 self.butler.put(df1, self.datasetType, dataId={}) 

551 

552 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

553 

554 tab2_df = tab2.to_pandas(index="index") 

555 self.assertTrue(df1.equals(tab2_df)) 

556 

557 # Check reading the columns. 

558 columns = list(tab2.columns.keys()) 

559 columns2 = self.butler.get( 

560 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

561 ) 

562 # We check the set because pandas reorders the columns. 

563 self.assertEqual(set(columns2), set(columns)) 

564 

565 # Check reading the schema. 

566 schema = ArrowAstropySchema(tab2) 

567 schema2 = self.butler.get( 

568 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowAstropySchema" 

569 ) 

570 

571 # The string types are objectified by pandas, and the order 

572 # will be changed because of pandas indexing. 

573 self.assertEqual(len(schema2.schema.columns), len(schema.schema.columns)) 

574 for name in schema.schema.columns: 

575 self.assertIn(name, schema2.schema.columns) 

576 if schema2.schema[name].dtype != np.dtype("O"): 

577 self.assertEqual(schema2.schema[name].dtype, schema.schema[name].dtype) 

578 

579 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

580 def testWriteSingleIndexDataFrameWithMaskedColsReadAsAstropyTable(self): 

581 # We need to special-case the write-as-pandas read-as-astropy code 

582 # with masks because pandas has multiple ways to use masked columns. 

583 # (The string column mask handling in particular is frustratingly 

584 # inconsistent.) 

585 df1, allColumns = _makeSingleIndexDataFrame(include_masked=True) 

586 

587 self.butler.put(df1, self.datasetType, dataId={}) 

588 

589 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

590 tab2_df = astropy_to_pandas(tab2, index="index") 

591 

592 self.assertTrue(df1.columns.equals(tab2_df.columns)) 

593 for name in tab2_df.columns: 

594 col1 = df1[name] 

595 col2 = tab2_df[name] 

596 

597 if col1.hasnans: 

598 notNull = col1.notnull() 

599 self.assertTrue(notNull.equals(col2.notnull())) 

600 # Need to check value-by-value because column may 

601 # be made of objects, depending on what pandas decides. 

602 for index in notNull.values.nonzero()[0]: 

603 self.assertEqual(col1[index], col2[index]) 

604 else: 

605 self.assertTrue(col1.equals(col2)) 

606 

607 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

608 def testWriteMultiIndexDataFrameReadAsAstropyTable(self): 

609 df1 = _makeMultiIndexDataFrame() 

610 

611 self.butler.put(df1, self.datasetType, dataId={}) 

612 

613 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

614 

615 # This is an odd duck, it doesn't really round-trip. 

616 # This test simply checks that it's readable, but definitely not 

617 # recommended. 

618 

619 @unittest.skipUnless(atable is not None, "Cannot test writing as astropy without astropy.") 

620 def testWriteAstropyTableWithMaskedColsReadAsSingleIndexDataFrame(self): 

621 tab1 = _makeSimpleAstropyTable(include_masked=True) 

622 

623 self.butler.put(tab1, self.datasetType, dataId={}) 

624 

625 tab2 = self.butler.get(self.datasetType, dataId={}) 

626 

627 tab1_df = astropy_to_pandas(tab1) 

628 self.assertTrue(tab1_df.equals(tab2)) 

629 

630 tab2_astropy = pandas_to_astropy(tab2) 

631 for col in tab1.dtype.names: 

632 np.testing.assert_array_equal(tab2_astropy[col], tab1[col]) 

633 if isinstance(tab1[col], atable.column.MaskedColumn): 

634 np.testing.assert_array_equal(tab2_astropy[col].mask, tab1[col].mask) 

635 

636 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

637 def testWriteSingleIndexDataFrameReadAsArrowTable(self): 

638 df1, allColumns = _makeSingleIndexDataFrame() 

639 

640 self.butler.put(df1, self.datasetType, dataId={}) 

641 

642 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable") 

643 

644 tab2_df = arrow_to_pandas(tab2) 

645 self.assertTrue(df1.equals(tab2_df)) 

646 

647 # Check reading the columns. 

648 columns = list(tab2.schema.names) 

649 columns2 = self.butler.get( 

650 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

651 ) 

652 # We check the set because pandas reorders the columns. 

653 self.assertEqual(set(columns), set(columns2)) 

654 

655 # Override the component using a dataset type. 

656 columnsType = self.datasetType.makeComponentDatasetType("columns").overrideStorageClass( 

657 "ArrowColumnList" 

658 ) 

659 self.assertEqual(columns2, self.butler.get(columnsType)) 

660 

661 # Check getting a component while overriding the storage class via 

662 # the dataset type. This overrides the parent storage class and then 

663 # selects the component. 

664 columnsType = self.datasetType.overrideStorageClass("ArrowAstropy").makeComponentDatasetType( 

665 "columns" 

666 ) 

667 self.assertEqual(columns2, self.butler.get(columnsType)) 

668 

669 # Check reading the schema. 

670 schema = tab2.schema 

671 schema2 = self.butler.get( 

672 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowSchema" 

673 ) 

674 

675 # These will not have the same metadata, nor will the string column 

676 # information be maintained. 

677 self.assertEqual(len(schema.names), len(schema2.names)) 

678 for name in schema.names: 

679 if schema.field(name).type not in (pa.string(), pa.binary()): 

680 self.assertEqual(schema.field(name).type, schema2.field(name).type) 

681 

682 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

683 def testWriteMultiIndexDataFrameReadAsArrowTable(self): 

684 df1 = _makeMultiIndexDataFrame() 

685 

686 self.butler.put(df1, self.datasetType, dataId={}) 

687 

688 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable") 

689 

690 tab2_df = arrow_to_pandas(tab2) 

691 self.assertTrue(df1.equals(tab2_df)) 

692 

693 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

694 def testWriteSingleIndexDataFrameReadAsNumpyTable(self): 

695 df1, allColumns = _makeSingleIndexDataFrame() 

696 

697 self.butler.put(df1, self.datasetType, dataId={}) 

698 

699 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy") 

700 

701 tab2_df = pd.DataFrame.from_records(tab2, index=["index"]) 

702 self.assertTrue(df1.equals(tab2_df)) 

703 

704 # Check reading the columns. 

705 columns = list(tab2.dtype.names) 

706 columns2 = self.butler.get( 

707 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

708 ) 

709 # We check the set because pandas reorders the columns. 

710 self.assertEqual(set(columns2), set(columns)) 

711 

712 # Check reading the schema. 

713 schema = ArrowNumpySchema(tab2.dtype) 

714 schema2 = self.butler.get( 

715 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowNumpySchema" 

716 ) 

717 

718 # The string types will be objectified by pandas, and the order 

719 # will be changed because of pandas indexing. 

720 self.assertEqual(len(schema.schema.names), len(schema2.schema.names)) 

721 for name in schema.schema.names: 

722 self.assertIn(name, schema2.schema.names) 

723 self.assertEqual(schema2.schema[name].type, schema.schema[name].type) 

724 

725 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

726 def testWriteMultiIndexDataFrameReadAsNumpyTable(self): 

727 df1 = _makeMultiIndexDataFrame() 

728 

729 self.butler.put(df1, self.datasetType, dataId={}) 

730 

731 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy") 

732 

733 # This is an odd duck, it doesn't really round-trip. 

734 # This test simply checks that it's readable, but definitely not 

735 # recommended. 

736 

737 @unittest.skipUnless(np is not None, "Cannot test reading as numpy dict without numpy.") 

738 def testWriteSingleIndexDataFrameReadAsNumpyDict(self): 

739 df1, allColumns = _makeSingleIndexDataFrame() 

740 

741 self.butler.put(df1, self.datasetType, dataId={}) 

742 

743 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict") 

744 

745 tab2_df = pd.DataFrame.from_records(tab2, index=["index"]) 

746 # The column order is not maintained. 

747 self.assertEqual(set(df1.columns), set(tab2_df.columns)) 

748 for col in df1.columns: 

749 self.assertTrue(np.all(df1[col].values == tab2_df[col].values)) 

750 

751 @unittest.skipUnless(np is not None, "Cannot test reading as numpy dict without numpy.") 

752 def testWriteMultiIndexDataFrameReadAsNumpyDict(self): 

753 df1 = _makeMultiIndexDataFrame() 

754 

755 self.butler.put(df1, self.datasetType, dataId={}) 

756 

757 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict") 

758 

759 # This is an odd duck, it doesn't really round-trip. 

760 # This test simply checks that it's readable, but definitely not 

761 # recommended. 

762 

763 def testBadDataFrameColumnParquet(self): 

764 df1, allColumns = _makeSingleIndexDataFrame() 

765 

766 # Make a column with mixed type. 

767 bad_col1 = [0.0] * len(df1) 

768 bad_col1[1] = 0.0 * units.nJy 

769 bad_df = df1.copy() 

770 bad_df["bad_col1"] = bad_col1 

771 

772 # At the moment we cannot check that the correct note is added 

773 # to the exception, but that will be possible in the future. 

774 with self.assertRaises(RuntimeError): 

775 self.butler.put(bad_df, self.datasetType, dataId={}) 

776 

777 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

778 def testWriteReadAstropyTableLossless(self): 

779 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True) 

780 

781 put_ref = self.butler.put(tab1, self.datasetType, dataId={}) 

782 

783 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

784 

785 # Check that minimal provenance was written by default. 

786 expected = { 

787 "LSST.BUTLER.ID": str(put_ref.id), 

788 "LSST.BUTLER.RUN": "test_run", 

789 "LSST.BUTLER.DATASETTYPE": "data", 

790 "LSST.BUTLER.N_INPUTS": 0, 

791 } 

792 

793 self.assertEqual(tab2.meta, expected) 

794 

795 _checkAstropyTableEquality(tab1, tab2) 

796 

797 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

798 def testWriteReadAstropyTableProvenance(self): 

799 tab1 = _makeSimpleAstropyTable() 

800 

801 # Create a ref for provenance. 

802 astropy_type = DatasetType( 

803 "astropy_parquet", 

804 dimensions=(), 

805 storageClass="ArrowAstropy", 

806 universe=self.butler.dimensions, 

807 ) 

808 self.butler.registry.registerDatasetType(astropy_type) 

809 input_ref = DatasetRef(astropy_type, {}, run="other_run") 

810 quantum_id = uuid.uuid4() 

811 provenance = DatasetProvenance(quantum_id=quantum_id) 

812 provenance.add_input(input_ref) 

813 

814 put_ref = self.butler.put(tab1, self.datasetType, dataId={}, provenance=provenance) 

815 

816 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

817 

818 expected = { 

819 "LSST.BUTLER.ID": str(put_ref.id), 

820 "LSST.BUTLER.RUN": "test_run", 

821 "LSST.BUTLER.DATASETTYPE": "data", 

822 "LSST.BUTLER.QUANTUM": str(quantum_id), 

823 "LSST.BUTLER.N_INPUTS": 1, 

824 "LSST.BUTLER.INPUT.0.ID": str(input_ref.id), 

825 "LSST.BUTLER.INPUT.0.RUN": "other_run", 

826 "LSST.BUTLER.INPUT.0.DATASETTYPE": "astropy_parquet", 

827 } 

828 self.assertEqual(tab2.meta, expected) 

829 

830 # Put the dataset again, with different provenance and ensure 

831 # that the previous provenance was stripped. 

832 self.butler.collections.register("new_run") 

833 put_ref3 = self.butler.put(tab2, self.datasetType, dataId={}, run="new_run") 

834 

835 # tab2 will have been updated in place. 

836 expected = { 

837 "LSST.BUTLER.ID": str(put_ref3.id), 

838 "LSST.BUTLER.RUN": "new_run", 

839 "LSST.BUTLER.DATASETTYPE": "data", 

840 "LSST.BUTLER.N_INPUTS": 0, 

841 } 

842 self.assertEqual(tab2.meta, expected) 

843 null_prov, prov_ref = DatasetProvenance.from_flat_dict(tab2.meta, self.butler) 

844 self.assertEqual(prov_ref, put_ref3) 

845 self.assertEqual(null_prov, DatasetProvenance()) 

846 

847 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

848 def testWriteReadNumpyTableLossless(self): 

849 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

850 

851 self.butler.put(tab1, self.datasetType, dataId={}) 

852 

853 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy") 

854 

855 _checkNumpyTableEquality(tab1, tab2) 

856 

857 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

858 def testMaskedNumpy(self): 

859 tab1 = _makeSimpleArrowTable(include_multidim=False, include_masked=True) 

860 tab1_np = arrow_to_numpy(tab1) 

861 self.assertIsInstance(tab1_np, np.ma.MaskedArray) 

862 # Stats on a masked column should ignore the nan in row 1. 

863 col = tab1_np["m_f8"] 

864 self.assertEqual(np.mean(col), 2.25, f"Column: {col}") 

865 

866 # Now without a mask. 

867 tab1 = _makeSimpleArrowTable(include_multidim=False, include_masked=False) 

868 tab1_np = arrow_to_numpy(tab1) 

869 self.assertNotIsInstance(tab1_np, np.ma.MaskedArray) 

870 

871 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

872 def testWriteReadArrowTableLossless(self): 

873 tab1 = _makeSimpleArrowTable(include_multidim=False, include_masked=True) 

874 

875 self.butler.put(tab1, self.datasetType, dataId={}) 

876 

877 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable") 

878 

879 self.assertEqual(tab1.schema, tab2.schema) 

880 tab1_np = arrow_to_numpy(tab1) 

881 tab2_np = arrow_to_numpy(tab2) 

882 for col in tab1.column_names: 

883 np.testing.assert_array_equal(tab2_np[col], tab1_np[col]) 

884 

885 @unittest.skipUnless(np is not None, "Cannot test reading as numpy dict without numpy.") 

886 def testWriteReadNumpyDictLossless(self): 

887 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

888 dict1 = _numpy_to_numpy_dict(tab1) 

889 

890 self.butler.put(tab1, self.datasetType, dataId={}) 

891 

892 dict2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict") 

893 

894 _checkNumpyDictEquality(dict1, dict2) 

895 

896 

897@unittest.skipUnless(pd is not None, "Cannot test InMemoryDatastore with DataFrames without pandas.") 

898class InMemoryDataFrameDelegateTestCase(ParquetFormatterDataFrameTestCase): 

899 """Tests for InMemoryDatastore, using ArrowTableDelegate with Dataframe.""" 

900 

901 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

902 

903 def testBadDataFrameColumnParquet(self): 

904 # This test does not raise for an in-memory datastore. 

905 pass 

906 

907 def testWriteMultiIndexDataFrameReadAsAstropyTable(self): 

908 df1 = _makeMultiIndexDataFrame() 

909 

910 self.butler.put(df1, self.datasetType, dataId={}) 

911 

912 with self.assertRaises(ValueError): 

913 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

914 

915 def testLegacyDataFrame(self): 

916 # This test does not work with an inMemoryDatastore. 

917 pass 

918 

919 def testBadInput(self): 

920 df1, _ = _makeSingleIndexDataFrame() 

921 delegate = ArrowTableDelegate("DataFrame") 

922 

923 with self.assertRaises(ValueError): 

924 delegate.handleParameters(inMemoryDataset="not_a_dataframe") 

925 

926 with self.assertRaises(AttributeError): 

927 delegate.getComponent(composite=df1, componentName="nothing") 

928 

929 def testStorageClass(self): 

930 df1, allColumns = _makeSingleIndexDataFrame() 

931 

932 factory = StorageClassFactory() 

933 factory.addFromConfig(StorageClassConfig()) 

934 

935 storageClass = factory.findStorageClass(type(df1), compare_types=False) 

936 # Force the name lookup to do name matching. 

937 storageClass._pytype = None 

938 self.assertEqual(storageClass.name, "DataFrame") 

939 

940 storageClass = factory.findStorageClass(type(df1), compare_types=True) 

941 # Force the name lookup to do name matching. 

942 storageClass._pytype = None 

943 self.assertEqual(storageClass.name, "DataFrame") 

944 

945 

946@unittest.skipUnless(atable is not None, "Cannot test ParquetFormatterArrowAstropy without astropy.") 

947@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowAstropy without pyarrow.") 

948class ParquetFormatterArrowAstropyTestCase(unittest.TestCase): 

949 """Tests for ParquetFormatter, ArrowAstropy, using local file datastore.""" 

950 

951 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

952 

953 def setUp(self): 

954 """Create a new butler root for each test.""" 

955 self.root = makeTestTempDir(TESTDIR) 

956 config = Config(self.configFile) 

957 self.run = "test_run" 

958 self.butler = Butler.from_config( 

959 Butler.makeRepo(self.root, config=config), writeable=True, run=self.run 

960 ) 

961 self.enterContext(self.butler) 

962 # No dimensions in dataset type so we don't have to worry about 

963 # inserting dimension data or defining data IDs. 

964 self.datasetType = DatasetType( 

965 "data", dimensions=(), storageClass="ArrowAstropy", universe=self.butler.dimensions 

966 ) 

967 self.butler.registry.registerDatasetType(self.datasetType) 

968 

969 def tearDown(self): 

970 removeTestTempDir(self.root) 

971 

972 def testAstropyTable(self): 

973 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True) 

974 

975 self.butler.put(tab1, self.datasetType, dataId={}) 

976 # Read the whole Table. 

977 tab2 = self.butler.get(self.datasetType, dataId={}) 

978 _checkAstropyTableEquality(tab1, tab2) 

979 # Read the columns. 

980 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

981 self.assertEqual(len(columns2), len(tab1.dtype.names)) 

982 for i, name in enumerate(tab1.dtype.names): 

983 self.assertEqual(columns2[i], name) 

984 # Read the rowcount. 

985 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

986 self.assertEqual(rowcount, len(tab1)) 

987 # Read the schema. 

988 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

989 self.assertEqual(schema, ArrowAstropySchema(tab1)) 

990 # Read just some columns a few different ways. 

991 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]}) 

992 _checkAstropyTableEquality(tab1[("a", "c")], tab3) 

993 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"}) 

994 _checkAstropyTableEquality(tab1[("a",)], tab4) 

995 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]}) 

996 _checkAstropyTableEquality(tab1[("index", "a")], tab5) 

997 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"}) 

998 _checkAstropyTableEquality(tab1[("ddd",)], tab6) 

999 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]}) 

1000 _checkAstropyTableEquality(tab1[("a",)], tab7) 

1001 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??"]}) 

1002 _checkAstropyTableEquality(tab1[("ddd", "dtn", "dtu")], tab8) 

1003 tab9 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??", "a*"]}) 

1004 _checkAstropyTableEquality(tab1[("ddd", "dtn", "dtu", "a")], tab9) 

1005 # Passing an unrecognized column should be a ValueError. 

1006 with self.assertRaises(ValueError): 

1007 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]}) 

1008 

1009 def testAstropyTableBigEndian(self): 

1010 tab1 = _makeSimpleAstropyTable(include_bigendian=True) 

1011 

1012 self.butler.put(tab1, self.datasetType, dataId={}) 

1013 # Read the whole Table. 

1014 tab2 = self.butler.get(self.datasetType, dataId={}) 

1015 _checkAstropyTableEquality(tab1, tab2, has_bigendian=True) 

1016 

1017 def testAstropyTableWithMetadata(self): 

1018 tab1 = _makeSimpleAstropyTable(include_multidim=True) 

1019 

1020 meta = { 

1021 "meta_a": 5, 

1022 "meta_b": 10.0, 

1023 "meta_c": [1, 2, 3], 

1024 "meta_d": True, 

1025 "meta_e": "string", 

1026 } 

1027 

1028 tab1.meta.update(meta) 

1029 

1030 self.butler.put(tab1, self.datasetType, dataId={}) 

1031 # Read the whole Table. 

1032 tab2 = self.butler.get(self.datasetType, dataId={}) 

1033 # This will check that the metadata is equivalent as well. 

1034 _checkAstropyTableEquality(tab1, tab2) 

1035 

1036 def testArrowAstropySchema(self): 

1037 tab1 = _makeSimpleAstropyTable() 

1038 tab1_arrow = astropy_to_arrow(tab1) 

1039 schema = ArrowAstropySchema.from_arrow(tab1_arrow.schema) 

1040 

1041 self.assertIsInstance(schema.schema, atable.Table) 

1042 self.assertEqual(repr(schema), repr(schema._schema)) 

1043 self.assertNotEqual(schema, "not_a_schema") 

1044 self.assertEqual(schema, schema) 

1045 

1046 # Test various inequalities 

1047 tab2 = tab1.copy() 

1048 tab2.rename_column("index", "index2") 

1049 schema2 = ArrowAstropySchema(tab2) 

1050 self.assertNotEqual(schema2, schema) 

1051 

1052 tab2 = tab1.copy() 

1053 tab2["index"].unit = units.micron 

1054 schema2 = ArrowAstropySchema(tab2) 

1055 self.assertNotEqual(schema2, schema) 

1056 

1057 tab2 = tab1.copy() 

1058 tab2["index"].description = "Index column" 

1059 schema2 = ArrowAstropySchema(tab2) 

1060 self.assertNotEqual(schema2, schema) 

1061 

1062 tab2 = tab1.copy() 

1063 tab2["index"].format = "%05d" 

1064 schema2 = ArrowAstropySchema(tab2) 

1065 self.assertNotEqual(schema2, schema) 

1066 

1067 def testAstropyParquet(self): 

1068 tab1 = _makeSimpleAstropyTable() 

1069 

1070 # Remove datetime column which doesn't work with astropy currently. 

1071 del tab1["dtn"] 

1072 del tab1["dtu"] 

1073 

1074 fname = os.path.join(self.root, "test_astropy.parq") 

1075 tab1.write(fname) 

1076 

1077 astropy_type = DatasetType( 

1078 "astropy_parquet", 

1079 dimensions=(), 

1080 storageClass="ArrowAstropy", 

1081 universe=self.butler.dimensions, 

1082 ) 

1083 self.butler.registry.registerDatasetType(astropy_type) 

1084 

1085 data_id = {} 

1086 ref = DatasetRef(astropy_type, data_id, run=self.run) 

1087 dataset = FileDataset(path=fname, refs=[ref], formatter=ParquetFormatter) 

1088 

1089 self.butler.ingest(dataset, transfer="copy") 

1090 

1091 self.butler.put(tab1, self.datasetType, dataId={}) 

1092 

1093 tab2a = self.butler.get(self.datasetType, dataId={}) 

1094 tab2b = self.butler.get("astropy_parquet", dataId={}) 

1095 _checkAstropyTableEquality(tab2a, tab2b) 

1096 

1097 columns2a = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

1098 columns2b = self.butler.get("astropy_parquet.columns", dataId={}) 

1099 self.assertEqual(len(columns2b), len(columns2a)) 

1100 for i, name in enumerate(columns2a): 

1101 self.assertEqual(columns2b[i], name) 

1102 

1103 rowcount2a = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

1104 rowcount2b = self.butler.get("astropy_parquet.rowcount", dataId={}) 

1105 self.assertEqual(rowcount2a, rowcount2b) 

1106 

1107 schema2a = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

1108 schema2b = self.butler.get("astropy_parquet.schema", dataId={}) 

1109 self.assertEqual(schema2a, schema2b) 

1110 

1111 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

1112 def testWriteAstropyReadAsArrowTable(self): 

1113 # This astropy <-> arrow works fine with masked columns. 

1114 tab1 = _makeSimpleAstropyTable(include_masked=True) 

1115 

1116 self.butler.put(tab1, self.datasetType, dataId={}) 

1117 

1118 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable") 

1119 

1120 tab2_astropy = arrow_to_astropy(tab2) 

1121 _checkAstropyTableEquality(tab1, tab2_astropy) 

1122 

1123 # Check reading the columns. 

1124 columns = tab2.schema.names 

1125 columns2 = self.butler.get( 

1126 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

1127 ) 

1128 self.assertEqual(columns2, columns) 

1129 

1130 # Check reading the schema. 

1131 schema = tab2.schema 

1132 schema2 = self.butler.get( 

1133 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowSchema" 

1134 ) 

1135 

1136 self.assertEqual(schema, schema2) 

1137 

1138 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.") 

1139 def testWriteAstropyReadAsDataFrame(self): 

1140 tab1 = _makeSimpleAstropyTable() 

1141 

1142 self.butler.put(tab1, self.datasetType, dataId={}) 

1143 

1144 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

1145 

1146 # This is tricky because it loses the units and gains a bonus pandas 

1147 # _index_ column, so we just test the dataframe form. 

1148 

1149 tab1_df = tab1.to_pandas() 

1150 self.assertTrue(tab1_df.equals(tab2)) 

1151 

1152 # Check reading the columns. 

1153 columns = tab2.columns 

1154 columns2 = self.butler.get( 

1155 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex" 

1156 ) 

1157 self.assertTrue(columns.equals(columns2)) 

1158 

1159 # Check reading the schema. 

1160 schema = DataFrameSchema(tab2) 

1161 schema2 = self.butler.get( 

1162 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema" 

1163 ) 

1164 

1165 self.assertEqual(schema2, schema) 

1166 

1167 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.") 

1168 def testWriteAstropyWithMaskedColsReadAsDataFrame(self): 

1169 # We need to special-case the write-as-astropy read-as-pandas code 

1170 # with masks because pandas has multiple ways to use masked columns. 

1171 # (When writing an astropy table with masked columns we get an object 

1172 # column back, but each unmasked element has the correct type.) 

1173 tab1 = _makeSimpleAstropyTable(include_masked=True) 

1174 

1175 self.butler.put(tab1, self.datasetType, dataId={}) 

1176 

1177 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

1178 

1179 tab1_df = astropy_to_pandas(tab1) 

1180 

1181 self.assertTrue(tab1_df.columns.equals(tab2.columns)) 

1182 for name in tab2.columns: 

1183 col1 = tab1_df[name] 

1184 col2 = tab2[name] 

1185 

1186 if col1.hasnans: 

1187 notNull = col1.notnull() 

1188 self.assertTrue(notNull.equals(col2.notnull())) 

1189 # Need to check value-by-value because column may 

1190 # be made of objects, depending on what pandas decides. 

1191 for index in notNull.values.nonzero()[0]: 

1192 self.assertEqual(col1[index], col2[index]) 

1193 else: 

1194 self.assertTrue(col1.equals(col2)) 

1195 

1196 @unittest.skipUnless(pd is not None, "Cannot test writing as a dataframe without pandas.") 

1197 def testWriteSingleIndexDataFrameWithMaskedColsReadAsAstropyTable(self): 

1198 df1, allColumns = _makeSingleIndexDataFrame(include_masked=True) 

1199 

1200 self.butler.put(df1, self.datasetType, dataId={}) 

1201 

1202 tab2 = self.butler.get(self.datasetType, dataId={}) 

1203 

1204 df1_tab = pandas_to_astropy(df1) 

1205 

1206 _checkAstropyTableEquality(df1_tab, tab2) 

1207 

1208 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

1209 def testWriteAstropyReadAsNumpyTable(self): 

1210 tab1 = _makeSimpleAstropyTable() 

1211 self.butler.put(tab1, self.datasetType, dataId={}) 

1212 

1213 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy") 

1214 

1215 # This is tricky because it loses the units. 

1216 tab2_astropy = atable.Table(tab2) 

1217 

1218 _checkAstropyTableEquality(tab1, tab2_astropy, skip_units=True) 

1219 

1220 # Check reading the columns. 

1221 columns = list(tab2.dtype.names) 

1222 columns2 = self.butler.get( 

1223 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

1224 ) 

1225 self.assertEqual(columns2, columns) 

1226 

1227 # Check reading the schema. 

1228 schema = ArrowNumpySchema(tab2.dtype) 

1229 schema2 = self.butler.get( 

1230 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowNumpySchema" 

1231 ) 

1232 

1233 self.assertEqual(schema2, schema) 

1234 

1235 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

1236 def testWriteAstropyReadAsNumpyDict(self): 

1237 tab1 = _makeSimpleAstropyTable() 

1238 self.butler.put(tab1, self.datasetType, dataId={}) 

1239 

1240 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict") 

1241 

1242 # This is tricky because it loses the units. 

1243 tab2_astropy = atable.Table(tab2) 

1244 

1245 _checkAstropyTableEquality(tab1, tab2_astropy, skip_units=True) 

1246 

1247 def testBadAstropyColumnParquet(self): 

1248 tab1 = _makeSimpleAstropyTable() 

1249 

1250 # Make a column with mixed type. 

1251 bad_col1 = [0.0] * len(tab1) 

1252 bad_col1[1] = 0.0 * units.nJy 

1253 bad_tab = tab1.copy() 

1254 bad_tab["bad_col1"] = bad_col1 

1255 

1256 # At the moment we cannot check that the correct note is added 

1257 # to the exception, but that will be possible in the future. 

1258 with self.assertRaises(RuntimeError): 

1259 self.butler.put(bad_tab, self.datasetType, dataId={}) 

1260 

1261 # Make a column with ragged size. 

1262 bad_col2 = [[0]] * len(tab1) 

1263 bad_col2[1] = [0, 0] 

1264 bad_tab = tab1.copy() 

1265 bad_tab["bad_col2"] = bad_col2 

1266 

1267 with self.assertRaises(RuntimeError): 

1268 self.butler.put(bad_tab, self.datasetType, dataId={}) 

1269 

1270 @unittest.skipUnless(pd is not None, "Cannot test ParquetFormatterDataFrame without pandas.") 

1271 def testWriteAstropyTableWithPandasIndexHint(self, testStrip=True): 

1272 tab1 = _makeSimpleAstropyTable() 

1273 

1274 add_pandas_index_to_astropy(tab1, "index") 

1275 

1276 self.butler.put(tab1, self.datasetType, dataId={}) 

1277 

1278 # Read in as an astropy table and ensure index hint is still there. 

1279 tab2 = self.butler.get(self.datasetType, dataId={}) 

1280 

1281 self.assertIn(ASTROPY_PANDAS_INDEX_KEY, tab2.meta) 

1282 self.assertEqual(tab2.meta[ASTROPY_PANDAS_INDEX_KEY], "index") 

1283 

1284 # Read as a dataframe and ensure index is set. 

1285 df3 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

1286 

1287 self.assertEqual(df3.index.name, "index") 

1288 

1289 # Read as a dataframe without naming the index column. 

1290 with self.assertLogs(level="WARNING") as cm: 

1291 _ = self.butler.get( 

1292 self.datasetType, 

1293 dataId={}, 

1294 storageClass="DataFrame", 

1295 parameters={"columns": ["a", "b"]}, 

1296 ) 

1297 self.assertIn("Index column ``index``", cm.output[0]) 

1298 

1299 if testStrip: 

1300 # Read as an astropy table without naming the index column. 

1301 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "b"]}) 

1302 

1303 self.assertNotIn(ASTROPY_PANDAS_INDEX_KEY, tab5.meta) 

1304 

1305 with self.assertRaises(ValueError): 

1306 add_pandas_index_to_astropy(tab1, "not_a_column") 

1307 

1308 

1309@unittest.skipUnless(atable is not None, "Cannot test InMemoryDatastore with AstropyTable without astropy.") 

1310class InMemoryArrowAstropyDelegateTestCase(ParquetFormatterArrowAstropyTestCase): 

1311 """Tests for InMemoryDatastore, using ArrowTableDelegate with 

1312 AstropyTable. 

1313 """ 

1314 

1315 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

1316 

1317 def testAstropyParquet(self): 

1318 # This test does not work with an inMemoryDatastore. 

1319 pass 

1320 

1321 def testBadAstropyColumnParquet(self): 

1322 # This test does not raise for an in-memory datastore. 

1323 pass 

1324 

1325 def testBadInput(self): 

1326 tab1 = _makeSimpleAstropyTable() 

1327 delegate = ArrowTableDelegate("ArrowAstropy") 

1328 

1329 with self.assertRaises(ValueError): 

1330 delegate.handleParameters(inMemoryDataset="not_an_astropy_table") 

1331 

1332 with self.assertRaises(NotImplementedError): 

1333 delegate.handleParameters(inMemoryDataset=tab1, parameters={"columns": [("a", "b")]}) 

1334 

1335 with self.assertRaises(AttributeError): 

1336 delegate.getComponent(composite=tab1, componentName="nothing") 

1337 

1338 @unittest.skipUnless(pd is not None, "Cannot test ParquetFormatterDataFrame without pandas.") 

1339 def testWriteAstropyTableWithPandasIndexHint(self): 

1340 super().testWriteAstropyTableWithPandasIndexHint(testStrip=False) 

1341 

1342 

1343@unittest.skipUnless(np is not None, "Cannot test ParquetFormatterArrowNumpy without numpy.") 

1344@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowNumpy without pyarrow.") 

1345class ParquetFormatterArrowNumpyTestCase(unittest.TestCase): 

1346 """Tests for ParquetFormatter, ArrowNumpy, using local file datastore.""" 

1347 

1348 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1349 

1350 def setUp(self): 

1351 """Create a new butler root for each test.""" 

1352 self.root = makeTestTempDir(TESTDIR) 

1353 config = Config(self.configFile) 

1354 self.butler = Butler.from_config( 

1355 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run" 

1356 ) 

1357 self.enterContext(self.butler) 

1358 # No dimensions in dataset type so we don't have to worry about 

1359 # inserting dimension data or defining data IDs. 

1360 self.datasetType = DatasetType( 

1361 "data", dimensions=(), storageClass="ArrowNumpy", universe=self.butler.dimensions 

1362 ) 

1363 self.butler.registry.registerDatasetType(self.datasetType) 

1364 

1365 def tearDown(self): 

1366 removeTestTempDir(self.root) 

1367 

1368 def testNumpyTable(self): 

1369 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1370 

1371 self.butler.put(tab1, self.datasetType, dataId={}) 

1372 # Read the whole Table. 

1373 tab2 = self.butler.get(self.datasetType, dataId={}) 

1374 _checkNumpyTableEquality(tab1, tab2) 

1375 # Read the columns. 

1376 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

1377 self.assertEqual(len(columns2), len(tab1.dtype.names)) 

1378 for i, name in enumerate(tab1.dtype.names): 

1379 self.assertEqual(columns2[i], name) 

1380 # Read the rowcount. 

1381 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

1382 self.assertEqual(rowcount, len(tab1)) 

1383 # Read the schema. 

1384 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

1385 self.assertEqual(schema, ArrowNumpySchema(tab1.dtype)) 

1386 # Read just some columns a few different ways. 

1387 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]}) 

1388 _checkNumpyTableEquality(tab1[["a", "c"]], tab3) 

1389 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"}) 

1390 _checkNumpyTableEquality( 

1391 tab1[ 

1392 [ 

1393 "a", 

1394 ] 

1395 ], 

1396 tab4, 

1397 ) 

1398 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]}) 

1399 _checkNumpyTableEquality(tab1[["index", "a"]], tab5) 

1400 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"}) 

1401 _checkNumpyTableEquality( 

1402 tab1[ 

1403 [ 

1404 "ddd", 

1405 ] 

1406 ], 

1407 tab6, 

1408 ) 

1409 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]}) 

1410 _checkNumpyTableEquality( 

1411 tab1[ 

1412 [ 

1413 "a", 

1414 ] 

1415 ], 

1416 tab7, 

1417 ) 

1418 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??", "a*"]}) 

1419 _checkNumpyTableEquality( 

1420 tab1[ 

1421 [ 

1422 "ddd", 

1423 "dtn", 

1424 "dtu", 

1425 "a", 

1426 ] 

1427 ], 

1428 tab8, 

1429 ) 

1430 # Passing an unrecognized column should be a ValueError. 

1431 with self.assertRaises(ValueError): 

1432 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]}) 

1433 

1434 def testNumpyTableBigEndian(self): 

1435 tab1 = _makeSimpleNumpyTable(include_bigendian=True) 

1436 

1437 self.butler.put(tab1, self.datasetType, dataId={}) 

1438 # Read the whole Table. 

1439 tab2 = self.butler.get(self.datasetType, dataId={}) 

1440 _checkNumpyTableEquality(tab1, tab2, has_bigendian=True) 

1441 

1442 def testArrowNumpySchema(self): 

1443 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1444 tab1_arrow = numpy_to_arrow(tab1) 

1445 schema = ArrowNumpySchema.from_arrow(tab1_arrow.schema) 

1446 

1447 self.assertIsInstance(schema.schema, np.dtype) 

1448 self.assertEqual(repr(schema), repr(schema._dtype)) 

1449 self.assertNotEqual(schema, "not_a_schema") 

1450 self.assertEqual(schema, schema) 

1451 

1452 # Test inequality 

1453 tab2 = tab1.copy() 

1454 names = list(tab2.dtype.names) 

1455 names[0] = "index2" 

1456 tab2.dtype.names = names 

1457 schema2 = ArrowNumpySchema(tab2.dtype) 

1458 self.assertNotEqual(schema2, schema) 

1459 

1460 @unittest.skipUnless(pa is not None, "Cannot test arrow conversions without pyarrow.") 

1461 def testNumpyDictConversions(self): 

1462 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1463 

1464 # Verify that everything round-trips, including the schema. 

1465 tab1_arrow = numpy_to_arrow(tab1) 

1466 tab1_dict = arrow_to_numpy_dict(tab1_arrow) 

1467 tab1_dict_arrow = numpy_dict_to_arrow(tab1_dict) 

1468 

1469 self.assertEqual(tab1_arrow.schema, tab1_dict_arrow.schema) 

1470 self.assertEqual(tab1_arrow, tab1_dict_arrow) 

1471 

1472 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

1473 def testWriteNumpyTableReadAsArrowTable(self): 

1474 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1475 

1476 self.butler.put(tab1, self.datasetType, dataId={}) 

1477 

1478 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable") 

1479 

1480 tab2_numpy = arrow_to_numpy(tab2) 

1481 

1482 _checkNumpyTableEquality(tab1, tab2_numpy) 

1483 

1484 # Check reading the columns. 

1485 columns = tab2.schema.names 

1486 columns2 = self.butler.get( 

1487 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

1488 ) 

1489 self.assertEqual(columns2, columns) 

1490 

1491 # Check reading the schema. 

1492 schema = tab2.schema 

1493 schema2 = self.butler.get( 

1494 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowSchema" 

1495 ) 

1496 self.assertEqual(schema2, schema) 

1497 

1498 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.") 

1499 def testWriteNumpyTableReadAsDataFrame(self): 

1500 tab1 = _makeSimpleNumpyTable() 

1501 

1502 self.butler.put(tab1, self.datasetType, dataId={}) 

1503 

1504 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

1505 

1506 # Converting this back to numpy gets confused with the index column 

1507 # and changes the datatype of the string column. 

1508 

1509 tab1_df = pd.DataFrame(tab1) 

1510 

1511 self.assertTrue(tab1_df.equals(tab2)) 

1512 

1513 # Check reading the columns. 

1514 columns = tab2.columns 

1515 columns2 = self.butler.get( 

1516 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex" 

1517 ) 

1518 self.assertTrue(columns.equals(columns2)) 

1519 

1520 # Check reading the schema. 

1521 schema = DataFrameSchema(tab2) 

1522 schema2 = self.butler.get( 

1523 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema" 

1524 ) 

1525 

1526 self.assertEqual(schema2, schema) 

1527 

1528 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

1529 def testWriteNumpyTableReadAsAstropyTable(self): 

1530 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1531 

1532 self.butler.put(tab1, self.datasetType, dataId={}) 

1533 

1534 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

1535 tab2_numpy = tab2.as_array() 

1536 

1537 _checkNumpyTableEquality(tab1, tab2_numpy) 

1538 

1539 # Check reading the columns. 

1540 columns = list(tab2.columns.keys()) 

1541 columns2 = self.butler.get( 

1542 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

1543 ) 

1544 self.assertEqual(columns2, columns) 

1545 

1546 # Check reading the schema. 

1547 schema = ArrowAstropySchema(tab2) 

1548 schema2 = self.butler.get( 

1549 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowAstropySchema" 

1550 ) 

1551 

1552 self.assertEqual(schema2, schema) 

1553 

1554 def testWriteNumpyTableReadAsNumpyDict(self): 

1555 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1556 

1557 self.butler.put(tab1, self.datasetType, dataId={}) 

1558 

1559 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict") 

1560 tab2_numpy = _numpy_dict_to_numpy(tab2) 

1561 

1562 _checkNumpyTableEquality(tab1, tab2_numpy) 

1563 

1564 def testBadNumpyColumnParquet(self): 

1565 tab1 = _makeSimpleAstropyTable() 

1566 

1567 # Make a column with mixed type. 

1568 bad_col1 = [0.0] * len(tab1) 

1569 bad_col1[1] = 0.0 * units.nJy 

1570 bad_tab = tab1.copy() 

1571 bad_tab["bad_col1"] = bad_col1 

1572 

1573 bad_tab_np = bad_tab.as_array() 

1574 

1575 # At the moment we cannot check that the correct note is added 

1576 # to the exception, but that will be possible in the future. 

1577 with self.assertRaises(RuntimeError): 

1578 self.butler.put(bad_tab_np, self.datasetType, dataId={}) 

1579 

1580 # Make a column with ragged size. 

1581 bad_col2 = [[0]] * len(tab1) 

1582 bad_col2[1] = [0, 0] 

1583 bad_tab = tab1.copy() 

1584 bad_tab["bad_col2"] = bad_col2 

1585 

1586 bad_tab_np = bad_tab.as_array() 

1587 

1588 with self.assertRaises(RuntimeError): 

1589 self.butler.put(bad_tab_np, self.datasetType, dataId={}) 

1590 

1591 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

1592 def testWriteReadAstropyTableLossless(self): 

1593 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True) 

1594 

1595 self.butler.put(tab1, self.datasetType, dataId={}) 

1596 

1597 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

1598 

1599 _checkAstropyTableEquality(tab1, tab2) 

1600 

1601 

1602@unittest.skipUnless(np is not None, "Cannot test ImMemoryDatastore with Numpy table without numpy.") 

1603class InMemoryArrowNumpyDelegateTestCase(ParquetFormatterArrowNumpyTestCase): 

1604 """Tests for InMemoryDatastore, using ArrowTableDelegate with 

1605 Numpy table. 

1606 """ 

1607 

1608 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

1609 

1610 def testBadNumpyColumnParquet(self): 

1611 # This test does not raise for an in-memory datastore. 

1612 pass 

1613 

1614 def testBadInput(self): 

1615 tab1 = _makeSimpleNumpyTable() 

1616 delegate = ArrowTableDelegate("ArrowNumpy") 

1617 

1618 with self.assertRaises(ValueError): 

1619 delegate.handleParameters(inMemoryDataset="not_a_numpy_table") 

1620 

1621 with self.assertRaises(NotImplementedError): 

1622 delegate.handleParameters(inMemoryDataset=tab1, parameters={"columns": [("a", "b")]}) 

1623 

1624 with self.assertRaises(AttributeError): 

1625 delegate.getComponent(composite=tab1, componentName="nothing") 

1626 

1627 def testStorageClass(self): 

1628 tab1 = _makeSimpleNumpyTable() 

1629 

1630 factory = StorageClassFactory() 

1631 factory.addFromConfig(StorageClassConfig()) 

1632 

1633 storageClass = factory.findStorageClass(type(tab1), compare_types=False) 

1634 # Force the name lookup to do name matching. 

1635 storageClass._pytype = None 

1636 self.assertEqual(storageClass.name, "ArrowNumpy") 

1637 

1638 storageClass = factory.findStorageClass(type(tab1), compare_types=True) 

1639 # Force the name lookup to do name matching. 

1640 storageClass._pytype = None 

1641 self.assertEqual(storageClass.name, "ArrowNumpy") 

1642 

1643 

1644@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowTable without pyarrow.") 

1645class ParquetFormatterArrowTableTestCase(unittest.TestCase): 

1646 """Tests for ParquetFormatter, ArrowTable, using local file datastore.""" 

1647 

1648 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1649 

1650 def setUp(self): 

1651 """Create a new butler root for each test.""" 

1652 self.root = makeTestTempDir(TESTDIR) 

1653 config = Config(self.configFile) 

1654 self.butler = Butler.from_config( 

1655 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run" 

1656 ) 

1657 self.enterContext(self.butler) 

1658 # No dimensions in dataset type so we don't have to worry about 

1659 # inserting dimension data or defining data IDs. 

1660 self.datasetType = DatasetType( 

1661 "data", dimensions=(), storageClass="ArrowTable", universe=self.butler.dimensions 

1662 ) 

1663 self.butler.registry.registerDatasetType(self.datasetType) 

1664 

1665 def tearDown(self): 

1666 removeTestTempDir(self.root) 

1667 

1668 def testArrowTable(self): 

1669 tab1 = _makeSimpleArrowTable(include_multidim=True, include_masked=True) 

1670 

1671 self.butler.put(tab1, self.datasetType, dataId={}) 

1672 # Read the whole Table. 

1673 tab2 = self.butler.get(self.datasetType, dataId={}) 

1674 # We convert to use the numpy testing framework to handle nan 

1675 # comparisons. 

1676 self.assertEqual(tab1.schema, tab2.schema) 

1677 tab1_np = arrow_to_numpy(tab1) 

1678 tab2_np = arrow_to_numpy(tab2) 

1679 for col in tab1.column_names: 

1680 np.testing.assert_array_equal(tab2_np[col], tab1_np[col]) 

1681 # Read the columns. 

1682 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

1683 self.assertEqual(len(columns2), len(tab1.schema.names)) 

1684 for i, name in enumerate(tab1.schema.names): 

1685 self.assertEqual(columns2[i], name) 

1686 # Read the rowcount. 

1687 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

1688 self.assertEqual(rowcount, len(tab1)) 

1689 # Read the schema. 

1690 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

1691 self.assertEqual(schema, tab1.schema) 

1692 # Read just some columns a few different ways. 

1693 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]}) 

1694 self.assertEqual(tab3, tab1.select(("a", "c"))) 

1695 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"}) 

1696 self.assertEqual(tab4, tab1.select(("a",))) 

1697 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]}) 

1698 self.assertEqual(tab5, tab1.select(("index", "a"))) 

1699 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"}) 

1700 self.assertEqual(tab6, tab1.select(("ddd",))) 

1701 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]}) 

1702 self.assertEqual(tab7, tab1.select(("a",))) 

1703 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a*", "d??"]}) 

1704 self.assertEqual(tab8, tab1.select(("a", "ddd", "dtn", "dtu"))) 

1705 # Passing an unrecognized column should be a ValueError. 

1706 with self.assertRaises(ValueError): 

1707 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]}) 

1708 

1709 def testEmptyArrowTable(self): 

1710 data = _makeSimpleNumpyTable() 

1711 type_list = _numpy_dtype_to_arrow_types(data.dtype) 

1712 

1713 schema = pa.schema(type_list) 

1714 arrays = [[]] * len(schema.names) 

1715 

1716 tab1 = pa.Table.from_arrays(arrays, schema=schema) 

1717 

1718 self.butler.put(tab1, self.datasetType, dataId={}) 

1719 tab2 = self.butler.get(self.datasetType, dataId={}) 

1720 self.assertEqual(tab2, tab1) 

1721 

1722 tab1_numpy = arrow_to_numpy(tab1) 

1723 self.assertEqual(len(tab1_numpy), 0) 

1724 tab1_numpy_arrow = numpy_to_arrow(tab1_numpy) 

1725 self.assertEqual(tab1_numpy_arrow, tab1) 

1726 

1727 tab1_pandas = arrow_to_pandas(tab1) 

1728 self.assertEqual(len(tab1_pandas), 0) 

1729 tab1_pandas_arrow = pandas_to_arrow(tab1_pandas) 

1730 # Unfortunately, string/byte columns get mangled when translated 

1731 # through empty pandas dataframes. 

1732 self.assertEqual( 

1733 tab1_pandas_arrow.select(("index", "a", "b", "c", "ddd")), 

1734 tab1.select(("index", "a", "b", "c", "ddd")), 

1735 ) 

1736 

1737 tab1_astropy = arrow_to_astropy(tab1) 

1738 self.assertEqual(len(tab1_astropy), 0) 

1739 tab1_astropy_arrow = astropy_to_arrow(tab1_astropy) 

1740 self.assertEqual(tab1_astropy_arrow, tab1) 

1741 

1742 def testEmptyArrowTableMultidim(self): 

1743 data = _makeSimpleNumpyTable(include_multidim=True) 

1744 type_list = _numpy_dtype_to_arrow_types(data.dtype) 

1745 

1746 md = {} 

1747 for name in data.dtype.names: 

1748 _append_numpy_multidim_metadata(md, name, data.dtype[name]) 

1749 

1750 schema = pa.schema(type_list, metadata=md) 

1751 arrays = [[]] * len(schema.names) 

1752 

1753 tab1 = pa.Table.from_arrays(arrays, schema=schema) 

1754 

1755 self.butler.put(tab1, self.datasetType, dataId={}) 

1756 tab2 = self.butler.get(self.datasetType, dataId={}) 

1757 self.assertEqual(tab2, tab1) 

1758 

1759 tab1_numpy = arrow_to_numpy(tab1) 

1760 self.assertEqual(len(tab1_numpy), 0) 

1761 tab1_numpy_arrow = numpy_to_arrow(tab1_numpy) 

1762 self.assertEqual(tab1_numpy_arrow, tab1) 

1763 

1764 tab1_astropy = arrow_to_astropy(tab1) 

1765 self.assertEqual(len(tab1_astropy), 0) 

1766 tab1_astropy_arrow = astropy_to_arrow(tab1_astropy) 

1767 self.assertEqual(tab1_astropy_arrow, tab1) 

1768 

1769 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.") 

1770 def testWriteArrowTableReadAsSingleIndexDataFrame(self): 

1771 df1, allColumns = _makeSingleIndexDataFrame() 

1772 

1773 self.butler.put(df1, self.datasetType, dataId={}) 

1774 

1775 # Read back out as a dataframe. 

1776 df2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

1777 self.assertTrue(df1.equals(df2)) 

1778 

1779 # Read back out as an arrow table, convert to dataframe. 

1780 tab3 = self.butler.get(self.datasetType, dataId={}) 

1781 df3 = arrow_to_pandas(tab3) 

1782 self.assertTrue(df1.equals(df3)) 

1783 

1784 # Check reading the columns. 

1785 columns = df2.reset_index().columns 

1786 columns2 = self.butler.get( 

1787 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex" 

1788 ) 

1789 # We check the set because pandas reorders the columns. 

1790 self.assertEqual(set(columns2.to_list()), set(columns.to_list())) 

1791 

1792 # Check reading the schema. 

1793 schema = DataFrameSchema(df1) 

1794 schema2 = self.butler.get( 

1795 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema" 

1796 ) 

1797 self.assertEqual(schema2, schema) 

1798 

1799 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.") 

1800 def testWriteArrowTableReadAsMultiIndexDataFrame(self): 

1801 df1 = _makeMultiIndexDataFrame() 

1802 

1803 self.butler.put(df1, self.datasetType, dataId={}) 

1804 

1805 # Read back out as a dataframe. 

1806 df2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

1807 self.assertTrue(df1.equals(df2)) 

1808 

1809 # Read back out as an arrow table, convert to dataframe. 

1810 atab3 = self.butler.get(self.datasetType, dataId={}) 

1811 df3 = arrow_to_pandas(atab3) 

1812 self.assertTrue(df1.equals(df3)) 

1813 

1814 # Check reading the columns. 

1815 columns = df2.columns 

1816 columns2 = self.butler.get( 

1817 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex" 

1818 ) 

1819 self.assertTrue(columns2.equals(columns)) 

1820 

1821 # Check reading the schema. 

1822 schema = DataFrameSchema(df1) 

1823 schema2 = self.butler.get( 

1824 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema" 

1825 ) 

1826 self.assertEqual(schema2, schema) 

1827 

1828 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

1829 def testWriteArrowTableReadAsAstropyTable(self): 

1830 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True) 

1831 

1832 self.butler.put(tab1, self.datasetType, dataId={}) 

1833 

1834 # Read back out as an astropy table. 

1835 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

1836 _checkAstropyTableEquality(tab1, tab2) 

1837 

1838 # Read back out as an arrow table, convert to astropy table. 

1839 atab3 = self.butler.get(self.datasetType, dataId={}) 

1840 tab3 = arrow_to_astropy(atab3) 

1841 _checkAstropyTableEquality(tab1, tab3) 

1842 

1843 # Check reading the columns. 

1844 columns = list(tab2.columns.keys()) 

1845 columns2 = self.butler.get( 

1846 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

1847 ) 

1848 self.assertEqual(columns2, columns) 

1849 

1850 # Check reading the schema. 

1851 schema = ArrowAstropySchema(tab1) 

1852 schema2 = self.butler.get( 

1853 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowAstropySchema" 

1854 ) 

1855 self.assertEqual(schema2, schema) 

1856 

1857 # Check the schema conversions and units. 

1858 arrow_schema = schema.to_arrow_schema() 

1859 for name in arrow_schema.names: 

1860 field_metadata = arrow_schema.field(name).metadata 

1861 if ( 

1862 b"description" in field_metadata 

1863 and (description := field_metadata[b"description"].decode("UTF-8")) != "" 

1864 ): 

1865 self.assertEqual(schema2.schema[name].description, description) 

1866 else: 

1867 self.assertIsNone(schema2.schema[name].description) 

1868 if b"unit" in field_metadata and (unit := field_metadata[b"unit"].decode("UTF-8")) != "": 

1869 self.assertEqual(schema2.schema[name].unit, units.Unit(unit)) 

1870 

1871 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

1872 def testWriteArrowTableReadAsNumpyTable(self): 

1873 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1874 

1875 self.butler.put(tab1, self.datasetType, dataId={}) 

1876 

1877 # Read back out as a numpy table. 

1878 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy") 

1879 _checkNumpyTableEquality(tab1, tab2) 

1880 

1881 # Read back out as an arrow table, convert to numpy table. 

1882 atab3 = self.butler.get(self.datasetType, dataId={}) 

1883 tab3 = arrow_to_numpy(atab3) 

1884 _checkNumpyTableEquality(tab1, tab3) 

1885 

1886 # Check reading the columns. 

1887 columns = list(tab2.dtype.names) 

1888 columns2 = self.butler.get( 

1889 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList" 

1890 ) 

1891 self.assertEqual(columns2, columns) 

1892 

1893 # Check reading the schema. 

1894 schema = ArrowNumpySchema(tab1.dtype) 

1895 schema2 = self.butler.get( 

1896 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowNumpySchema" 

1897 ) 

1898 self.assertEqual(schema2, schema) 

1899 

1900 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.") 

1901 def testWriteArrowTableReadAsNumpyDict(self): 

1902 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1903 

1904 self.butler.put(tab1, self.datasetType, dataId={}) 

1905 

1906 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict") 

1907 tab2_numpy = _numpy_dict_to_numpy(tab2) 

1908 _checkNumpyTableEquality(tab1, tab2_numpy) 

1909 

1910 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

1911 def testWriteReadAstropyTableLossless(self): 

1912 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True) 

1913 

1914 self.butler.put(tab1, self.datasetType, dataId={}) 

1915 

1916 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

1917 

1918 _checkAstropyTableEquality(tab1, tab2) 

1919 

1920 

1921@unittest.skipUnless(pa is not None, "Cannot test InMemoryDatastore with ArroWTable without pyarrow.") 

1922class InMemoryArrowTableDelegateTestCase(ParquetFormatterArrowTableTestCase): 

1923 """Tests for InMemoryDatastore, using ArrowTableDelegate.""" 

1924 

1925 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

1926 

1927 def testBadInput(self): 

1928 tab1 = _makeSimpleArrowTable() 

1929 delegate = ArrowTableDelegate("ArrowTable") 

1930 

1931 with self.assertRaises(ValueError): 

1932 delegate.handleParameters(inMemoryDataset="not_an_arrow_table") 

1933 

1934 with self.assertRaises(NotImplementedError): 

1935 delegate.handleParameters(inMemoryDataset=tab1, parameters={"columns": [("a", "b")]}) 

1936 

1937 with self.assertRaises(AttributeError): 

1938 delegate.getComponent(composite=tab1, componentName="nothing") 

1939 

1940 def testStorageClass(self): 

1941 tab1 = _makeSimpleArrowTable() 

1942 

1943 factory = StorageClassFactory() 

1944 factory.addFromConfig(StorageClassConfig()) 

1945 

1946 storageClass = factory.findStorageClass(type(tab1), compare_types=False) 

1947 # Force the name lookup to do name matching. 

1948 storageClass._pytype = None 

1949 self.assertEqual(storageClass.name, "ArrowTable") 

1950 

1951 storageClass = factory.findStorageClass(type(tab1), compare_types=True) 

1952 # Force the name lookup to do name matching. 

1953 storageClass._pytype = None 

1954 self.assertEqual(storageClass.name, "ArrowTable") 

1955 

1956 

1957@unittest.skipUnless(np is not None, "Cannot test ParquetFormatterArrowNumpy without numpy.") 

1958@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowNumpy without pyarrow.") 

1959class ParquetFormatterArrowNumpyDictTestCase(unittest.TestCase): 

1960 """Tests for ParquetFormatter, ArrowNumpyDict, using local file store.""" 

1961 

1962 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1963 

1964 def setUp(self): 

1965 """Create a new butler root for each test.""" 

1966 self.root = makeTestTempDir(TESTDIR) 

1967 config = Config(self.configFile) 

1968 self.butler = Butler.from_config( 

1969 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run" 

1970 ) 

1971 self.enterContext(self.butler) 

1972 # No dimensions in dataset type so we don't have to worry about 

1973 # inserting dimension data or defining data IDs. 

1974 self.datasetType = DatasetType( 

1975 "data", dimensions=(), storageClass="ArrowNumpyDict", universe=self.butler.dimensions 

1976 ) 

1977 self.butler.registry.registerDatasetType(self.datasetType) 

1978 

1979 def tearDown(self): 

1980 removeTestTempDir(self.root) 

1981 

1982 def testNumpyDict(self): 

1983 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

1984 dict1 = _numpy_to_numpy_dict(tab1) 

1985 

1986 self.butler.put(dict1, self.datasetType, dataId={}) 

1987 # Read the whole table. 

1988 dict2 = self.butler.get(self.datasetType, dataId={}) 

1989 _checkNumpyDictEquality(dict1, dict2) 

1990 # Read the columns. 

1991 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

1992 self.assertEqual(len(columns2), len(dict1.keys())) 

1993 for name in dict1: 

1994 self.assertIn(name, columns2) 

1995 # Read the rowcount. 

1996 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

1997 self.assertEqual(rowcount, len(dict1["a"])) 

1998 # Read the schema. 

1999 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

2000 self.assertEqual(schema, ArrowNumpySchema(tab1.dtype)) 

2001 # Read just some columns a few different ways. 

2002 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]}) 

2003 subdict = {key: dict1[key] for key in ["a", "c"]} 

2004 _checkNumpyDictEquality(subdict, tab3) 

2005 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"}) 

2006 subdict = {key: dict1[key] for key in ["a"]} 

2007 _checkNumpyDictEquality(subdict, tab4) 

2008 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]}) 

2009 subdict = {key: dict1[key] for key in ["index", "a"]} 

2010 _checkNumpyDictEquality(subdict, tab5) 

2011 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"}) 

2012 subdict = {key: dict1[key] for key in ["ddd"]} 

2013 _checkNumpyDictEquality(subdict, tab6) 

2014 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]}) 

2015 subdict = {key: dict1[key] for key in ["a"]} 

2016 _checkNumpyDictEquality(subdict, tab7) 

2017 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??", "a*"]}) 

2018 subdict = {key: dict1[key] for key in ["ddd", "dtn", "dtu", "a"]} 

2019 _checkNumpyDictEquality(subdict, tab8) 

2020 # Passing an unrecognized column should be a ValueError. 

2021 with self.assertRaises(ValueError): 

2022 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]}) 

2023 

2024 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.") 

2025 def testWriteNumpyDictReadAsArrowTable(self): 

2026 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

2027 dict1 = _numpy_to_numpy_dict(tab1) 

2028 

2029 self.butler.put(dict1, self.datasetType, dataId={}) 

2030 

2031 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable") 

2032 

2033 tab2_dict = arrow_to_numpy_dict(tab2) 

2034 

2035 _checkNumpyDictEquality(dict1, tab2_dict) 

2036 

2037 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.") 

2038 def testWriteNumpyDictReadAsDataFrame(self): 

2039 tab1 = _makeSimpleNumpyTable() 

2040 dict1 = _numpy_to_numpy_dict(tab1) 

2041 

2042 self.butler.put(dict1, self.datasetType, dataId={}) 

2043 

2044 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame") 

2045 

2046 # The order of the dict may get mixed up, so we need to check column 

2047 # by column. We also need to do this in dataframe form because pandas 

2048 # changes the datatype of the string column. 

2049 tab1_df = pd.DataFrame(tab1) 

2050 

2051 self.assertEqual(set(tab1_df.columns), set(tab2.columns)) 

2052 for col in tab1_df.columns: 

2053 self.assertTrue(np.all(tab1_df[col].values == tab2[col].values)) 

2054 

2055 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

2056 def testWriteNumpyDictReadAsAstropyTable(self): 

2057 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

2058 dict1 = _numpy_to_numpy_dict(tab1) 

2059 

2060 self.butler.put(dict1, self.datasetType, dataId={}) 

2061 

2062 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

2063 tab2_dict = _astropy_to_numpy_dict(tab2) 

2064 

2065 _checkNumpyDictEquality(dict1, tab2_dict) 

2066 

2067 def testWriteNumpyDictReadAsNumpyTable(self): 

2068 tab1 = _makeSimpleNumpyTable(include_multidim=True) 

2069 dict1 = _numpy_to_numpy_dict(tab1) 

2070 

2071 self.butler.put(dict1, self.datasetType, dataId={}) 

2072 

2073 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy") 

2074 tab2_dict = _numpy_to_numpy_dict(tab2) 

2075 

2076 _checkNumpyDictEquality(dict1, tab2_dict) 

2077 

2078 def testWriteNumpyDictBad(self): 

2079 dict1 = {"a": 4, "b": np.ndarray([1])} 

2080 with self.assertRaises(RuntimeError): 

2081 self.butler.put(dict1, self.datasetType, dataId={}) 

2082 

2083 dict2 = {"a": np.zeros(4), "b": np.zeros(5)} 

2084 with self.assertRaises(RuntimeError): 

2085 self.butler.put(dict2, self.datasetType, dataId={}) 

2086 

2087 dict3 = {"a": [0] * 5, "b": np.zeros(5)} 

2088 with self.assertRaises(RuntimeError): 

2089 self.butler.put(dict3, self.datasetType, dataId={}) 

2090 

2091 dict4 = {"a": np.zeros(4), "b": np.zeros(4, dtype="O")} 

2092 with self.assertRaises(RuntimeError): 

2093 self.butler.put(dict4, self.datasetType, dataId={}) 

2094 

2095 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.") 

2096 def testWriteReadAstropyTableLossless(self): 

2097 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True) 

2098 

2099 self.butler.put(tab1, self.datasetType, dataId={}) 

2100 

2101 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy") 

2102 

2103 _checkAstropyTableEquality(tab1, tab2) 

2104 

2105 

2106@unittest.skipUnless(np is not None, "Cannot test InMemoryDatastore with NumpyDict without numpy.") 

2107@unittest.skipUnless(pa is not None, "Cannot test InMemoryDatastore with NumpyDict without pyarrow.") 

2108class InMemoryNumpyDictDelegateTestCase(ParquetFormatterArrowNumpyDictTestCase): 

2109 """Tests for InMemoryDatastore, using ArrowTableDelegate with 

2110 Numpy dict. 

2111 """ 

2112 

2113 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

2114 

2115 def testWriteNumpyDictBad(self): 

2116 # The sub-type checking is not done on in-memory datastore. 

2117 pass 

2118 

2119 

2120@unittest.skipUnless(pa is not None, "Cannot test ArrowSchema without pyarrow.") 

2121class ParquetFormatterArrowSchemaTestCase(unittest.TestCase): 

2122 """Tests for ParquetFormatter, ArrowSchema, using local file datastore.""" 

2123 

2124 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2125 

2126 def setUp(self): 

2127 """Create a new butler root for each test.""" 

2128 self.root = makeTestTempDir(TESTDIR) 

2129 config = Config(self.configFile) 

2130 self.butler = Butler.from_config( 

2131 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run" 

2132 ) 

2133 self.enterContext(self.butler) 

2134 # No dimensions in dataset type so we don't have to worry about 

2135 # inserting dimension data or defining data IDs. 

2136 self.datasetType = DatasetType( 

2137 "data", dimensions=(), storageClass="ArrowSchema", universe=self.butler.dimensions 

2138 ) 

2139 self.butler.registry.registerDatasetType(self.datasetType) 

2140 

2141 def tearDown(self): 

2142 removeTestTempDir(self.root) 

2143 

2144 def _makeTestSchema(self): 

2145 schema = pa.schema( 

2146 [ 

2147 pa.field( 

2148 "int32", 

2149 pa.int32(), 

2150 nullable=False, 

2151 metadata={ 

2152 "description": "32-bit integer", 

2153 "unit": "", 

2154 }, 

2155 ), 

2156 pa.field( 

2157 "int64", 

2158 pa.int64(), 

2159 nullable=False, 

2160 metadata={ 

2161 "description": "64-bit integer", 

2162 "unit": "", 

2163 }, 

2164 ), 

2165 pa.field( 

2166 "uint64", 

2167 pa.uint64(), 

2168 nullable=False, 

2169 metadata={ 

2170 "description": "64-bit unsigned integer", 

2171 "unit": "", 

2172 }, 

2173 ), 

2174 pa.field( 

2175 "float32", 

2176 pa.float32(), 

2177 nullable=False, 

2178 metadata={ 

2179 "description": "32-bit float", 

2180 "unit": "count", 

2181 }, 

2182 ), 

2183 pa.field( 

2184 "float64", 

2185 pa.float64(), 

2186 nullable=False, 

2187 metadata={ 

2188 "description": "64-bit float", 

2189 "unit": "nJy", 

2190 }, 

2191 ), 

2192 pa.field( 

2193 "fixed_size_list", 

2194 pa.list_(pa.float64(), list_size=10), 

2195 nullable=False, 

2196 metadata={ 

2197 "description": "Fixed size list of 64-bit floats.", 

2198 "unit": "nJy", 

2199 }, 

2200 ), 

2201 pa.field( 

2202 "variable_size_list", 

2203 pa.list_(pa.float64()), 

2204 nullable=False, 

2205 metadata={ 

2206 "description": "Variable size list of 64-bit floats.", 

2207 "unit": "nJy", 

2208 }, 

2209 ), 

2210 # One of these fields will have no description. 

2211 pa.field( 

2212 "string", 

2213 pa.string(), 

2214 nullable=False, 

2215 metadata={ 

2216 "unit": "", 

2217 }, 

2218 ), 

2219 # One of these fields will have no metadata. 

2220 pa.field( 

2221 "binary", 

2222 pa.binary(), 

2223 nullable=False, 

2224 ), 

2225 ] 

2226 ) 

2227 

2228 return schema 

2229 

2230 def testArrowSchema(self): 

2231 schema1 = self._makeTestSchema() 

2232 self.butler.put(schema1, self.datasetType, dataId={}) 

2233 

2234 schema2 = self.butler.get(self.datasetType, dataId={}) 

2235 self.assertEqual(schema2, schema1) 

2236 

2237 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe schema without pandas.") 

2238 def testWriteArrowSchemaReadAsDataFrameSchema(self): 

2239 schema1 = self._makeTestSchema() 

2240 self.butler.put(schema1, self.datasetType, dataId={}) 

2241 

2242 df_schema1 = DataFrameSchema.from_arrow(schema1) 

2243 

2244 df_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrameSchema") 

2245 self.assertEqual(df_schema2, df_schema1) 

2246 

2247 @unittest.skipUnless(atable is not None, "Cannot test reading as an astropy schema without astropy.") 

2248 def testWriteArrowSchemaReadAsArrowAstropySchema(self): 

2249 schema1 = self._makeTestSchema() 

2250 self.butler.put(schema1, self.datasetType, dataId={}) 

2251 

2252 ap_schema1 = ArrowAstropySchema.from_arrow(schema1) 

2253 

2254 ap_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropySchema") 

2255 self.assertEqual(ap_schema2, ap_schema1) 

2256 

2257 # Confirm that the ap_schema2 has the unit/description we expect. 

2258 for name in schema1.names: 

2259 field_metadata = schema1.field(name).metadata 

2260 if field_metadata is None: 

2261 continue 

2262 if ( 

2263 b"description" in field_metadata 

2264 and (description := field_metadata[b"description"].decode("UTF-8")) != "" 

2265 ): 

2266 self.assertEqual(ap_schema2.schema[name].description, description) 

2267 else: 

2268 self.assertIsNone(ap_schema2.schema[name].description) 

2269 if b"unit" in field_metadata and (unit := field_metadata[b"unit"].decode("UTF-8")) != "": 

2270 self.assertEqual(ap_schema2.schema[name].unit, units.Unit(unit)) 

2271 

2272 @unittest.skipUnless(atable is not None, "Cannot test reading as an numpy schema without numpy.") 

2273 def testWriteArrowSchemaReadAsArrowNumpySchema(self): 

2274 schema1 = self._makeTestSchema() 

2275 self.butler.put(schema1, self.datasetType, dataId={}) 

2276 

2277 np_schema1 = ArrowNumpySchema.from_arrow(schema1) 

2278 

2279 np_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpySchema") 

2280 self.assertEqual(np_schema2, np_schema1) 

2281 

2282 

2283@unittest.skipUnless(pa is not None, "Cannot test InMemoryDatastore with ArrowSchema without pyarrow.") 

2284class InMemoryArrowSchemaDelegateTestCase(ParquetFormatterArrowSchemaTestCase): 

2285 """Tests for InMemoryDatastore and ArrowSchema.""" 

2286 

2287 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

2288 

2289 

2290@unittest.skipUnless(pa is not None, "Cannot test S3 without pyarrow.") 

2291@unittest.skipUnless(boto3 is not None, "Cannot test S3 without boto3.") 

2292@unittest.skipUnless(fsspec is not None, "Cannot test S3 without fsspec.") 

2293@unittest.skipUnless(s3fs is not None, "Cannot test S3 without s3fs.") 

2294class ParquetFormatterArrowTableS3TestCase(unittest.TestCase): 

2295 """Tests for arrow table/parquet with S3.""" 

2296 

2297 # Code is adapted from test_butler.py 

2298 configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml") 

2299 fullConfigKey = None 

2300 validationCanFail = True 

2301 

2302 bucketName = "anybucketname" 

2303 

2304 root = "butlerRoot/" 

2305 

2306 datastoreStr = [f"datastore={root}"] 

2307 

2308 datastoreName = ["FileDatastore@s3://{bucketName}/{root}"] 

2309 

2310 registryStr = "/gen3.sqlite3" 

2311 

2312 mock_aws = mock_aws() 

2313 

2314 def setUp(self): 

2315 self.root = makeTestTempDir(TESTDIR) 

2316 

2317 config = Config(self.configFile) 

2318 uri = ResourcePath(config[".datastore.datastore.root"]) 

2319 self.bucketName = uri.netloc 

2320 

2321 # Enable S3 mocking of tests. 

2322 self.enterContext(clean_test_environment_for_s3()) 

2323 self.mock_aws.start() 

2324 

2325 rooturi = f"s3://{self.bucketName}/{self.root}" 

2326 config.update({"datastore": {"datastore": {"root": rooturi}}}) 

2327 

2328 # need local folder to store registry database 

2329 self.reg_dir = makeTestTempDir(TESTDIR) 

2330 config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3" 

2331 

2332 # MOTO needs to know that we expect Bucket bucketname to exist 

2333 # (this used to be the class attribute bucketName) 

2334 s3 = boto3.resource("s3") 

2335 s3.create_bucket(Bucket=self.bucketName) 

2336 

2337 self.datastoreStr = [f"datastore='{rooturi}'"] 

2338 self.datastoreName = [f"FileDatastore@{rooturi}"] 

2339 Butler.makeRepo(rooturi, config=config, forceConfigRoot=False) 

2340 self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml") 

2341 

2342 self.butler = Butler(self.tmpConfigFile, writeable=True, run="test_run") 

2343 self.enterContext(self.butler) 

2344 

2345 # No dimensions in dataset type so we don't have to worry about 

2346 # inserting dimension data or defining data IDs. 

2347 self.datasetType = DatasetType( 

2348 "data", dimensions=(), storageClass="ArrowTable", universe=self.butler.dimensions 

2349 ) 

2350 self.butler.registry.registerDatasetType(self.datasetType) 

2351 

2352 def tearDown(self): 

2353 s3 = boto3.resource("s3") 

2354 bucket = s3.Bucket(self.bucketName) 

2355 try: 

2356 bucket.objects.all().delete() 

2357 except botocore.exceptions.ClientError as e: 

2358 if e.response["Error"]["Code"] == "404": 

2359 # the key was not reachable - pass 

2360 pass 

2361 else: 

2362 raise 

2363 

2364 bucket = s3.Bucket(self.bucketName) 

2365 bucket.delete() 

2366 

2367 # Stop the S3 mock. 

2368 self.mock_aws.stop() 

2369 

2370 if self.reg_dir is not None and os.path.exists(self.reg_dir): 

2371 shutil.rmtree(self.reg_dir, ignore_errors=True) 

2372 

2373 if os.path.exists(self.root): 

2374 shutil.rmtree(self.root, ignore_errors=True) 

2375 

2376 def testArrowTableS3(self): 

2377 tab1 = _makeSimpleArrowTable(include_multidim=True, include_masked=True) 

2378 

2379 self.butler.put(tab1, self.datasetType, dataId={}) 

2380 

2381 # Read the whole Table. 

2382 tab2 = self.butler.get(self.datasetType, dataId={}) 

2383 # We convert to use the numpy testing framework to handle nan 

2384 # comparisons. 

2385 self.assertEqual(tab1.schema, tab2.schema) 

2386 tab1_np = arrow_to_numpy(tab1) 

2387 tab2_np = arrow_to_numpy(tab2) 

2388 for col in tab1.column_names: 

2389 np.testing.assert_array_equal(tab2_np[col], tab1_np[col]) 

2390 # Read the columns. 

2391 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={}) 

2392 self.assertEqual(len(columns2), len(tab1.schema.names)) 

2393 for i, name in enumerate(tab1.schema.names): 

2394 self.assertEqual(columns2[i], name) 

2395 # Read the rowcount. 

2396 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={}) 

2397 self.assertEqual(rowcount, len(tab1)) 

2398 # Read the schema. 

2399 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={}) 

2400 self.assertEqual(schema, tab1.schema) 

2401 # Read just some columns a few different ways. 

2402 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]}) 

2403 self.assertEqual(tab3, tab1.select(("a", "c"))) 

2404 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"}) 

2405 self.assertEqual(tab4, tab1.select(("a",))) 

2406 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]}) 

2407 self.assertEqual(tab5, tab1.select(("index", "a"))) 

2408 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"}) 

2409 self.assertEqual(tab6, tab1.select(("ddd",))) 

2410 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]}) 

2411 self.assertEqual(tab7, tab1.select(("a",))) 

2412 # Passing an unrecognized column should be a ValueError. 

2413 with self.assertRaises(ValueError): 

2414 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]}) 

2415 

2416 

2417@unittest.skipUnless(np is not None, "Cannot test compute_row_group_size without numpy.") 

2418@unittest.skipUnless(pa is not None, "Cannot test compute_row_group_size without pyarrow.") 

2419class ComputeRowGroupSizeTestCase(unittest.TestCase): 

2420 """Tests for compute_row_group_size.""" 

2421 

2422 def testRowGroupSizeNoMetadata(self): 

2423 numpyTable = _makeSimpleNumpyTable(include_multidim=True) 

2424 

2425 # We can't use the numpy_to_arrow convenience function because 

2426 # that adds metadata. 

2427 type_list = _numpy_dtype_to_arrow_types(numpyTable.dtype) 

2428 schema = pa.schema(type_list) 

2429 arrays = _numpy_style_arrays_to_arrow_arrays( 

2430 numpyTable.dtype, 

2431 len(numpyTable), 

2432 numpyTable, 

2433 schema, 

2434 ) 

2435 arrowTable = pa.Table.from_arrays(arrays, schema=schema) 

2436 

2437 row_group_size = compute_row_group_size(arrowTable.schema) 

2438 

2439 self.assertGreater(row_group_size, 1_000_000) 

2440 self.assertLess(row_group_size, 2_000_000) 

2441 

2442 def testRowGroupSizeWithMetadata(self): 

2443 numpyTable = _makeSimpleNumpyTable(include_multidim=True) 

2444 

2445 arrowTable = numpy_to_arrow(numpyTable) 

2446 

2447 row_group_size = compute_row_group_size(arrowTable.schema) 

2448 

2449 self.assertGreater(row_group_size, 1_000_000) 

2450 self.assertLess(row_group_size, 2_000_000) 

2451 

2452 def testRowGroupSizeTinyTable(self): 

2453 numpyTable = np.zeros(1, dtype=[("a", np.bool_)]) 

2454 

2455 arrowTable = numpy_to_arrow(numpyTable) 

2456 

2457 row_group_size = compute_row_group_size(arrowTable.schema) 

2458 

2459 self.assertGreater(row_group_size, 1_000_000) 

2460 

2461 @unittest.skipUnless(pd is not None, "Cannot run testRowGroupSizeDataFrameWithLists without pandas.") 

2462 def testRowGroupSizeDataFrameWithLists(self): 

2463 df = pd.DataFrame({"a": np.zeros(10), "b": [[0, 0]] * 10, "c": [[0.0, 0.0]] * 10, "d": [[]] * 10}) 

2464 arrowTable = pandas_to_arrow(df) 

2465 row_group_size = compute_row_group_size(arrowTable.schema) 

2466 

2467 self.assertGreater(row_group_size, 1_000_000) 

2468 

2469 

2470def _checkAstropyTableEquality(table1, table2, skip_units=False, has_bigendian=False): 

2471 """Check if two astropy tables have the same columns/values. 

2472 

2473 Parameters 

2474 ---------- 

2475 table1 : `astropy.table.Table` 

2476 table2 : `astropy.table.Table` 

2477 skip_units : `bool` 

2478 has_bigendian : `bool` 

2479 """ 

2480 if not has_bigendian: 

2481 assert table1.dtype == table2.dtype 

2482 else: 

2483 for name in table1.dtype.names: 

2484 # Only check type matches, force to little-endian. 

2485 assert table1.dtype[name].newbyteorder(">") == table2.dtype[name].newbyteorder(">") 

2486 

2487 # Strip provenance before comparison. 

2488 DatasetProvenance.strip_provenance_from_flat_dict(table1.meta) 

2489 DatasetProvenance.strip_provenance_from_flat_dict(table2.meta) 

2490 assert table1.meta == table2.meta 

2491 if not skip_units: 

2492 for name in table1.columns: 

2493 assert table1[name].unit == table2[name].unit 

2494 assert table1[name].description == table2[name].description 

2495 assert table1[name].format == table2[name].format 

2496 

2497 for name in table1.columns: 

2498 # We need to check masked/regular columns after filling. 

2499 has_masked = False 

2500 if isinstance(table1[name], atable.column.MaskedColumn): 

2501 c1 = table1[name].filled() 

2502 has_masked = True 

2503 else: 

2504 c1 = np.array(table1[name]) 

2505 if has_masked: 

2506 assert isinstance(table2[name], atable.column.MaskedColumn) 

2507 c2 = table2[name].filled() 

2508 else: 

2509 assert not isinstance(table2[name], atable.column.MaskedColumn) 

2510 c2 = np.array(table2[name]) 

2511 np.testing.assert_array_equal(c1, c2) 

2512 # If we have a masked column then we test the underlying data. 

2513 if has_masked: 

2514 np.testing.assert_array_equal(np.array(c1), np.array(c2)) 

2515 np.testing.assert_array_equal(table1[name].mask, table2[name].mask) 

2516 

2517 

2518def _checkNumpyTableEquality(table1, table2, has_bigendian=False): 

2519 """Check if two numpy tables have the same columns/values 

2520 

2521 Parameters 

2522 ---------- 

2523 table1 : `numpy.ndarray` 

2524 table2 : `numpy.ndarray` 

2525 has_bigendian : `bool` 

2526 """ 

2527 assert table1.dtype.names == table2.dtype.names 

2528 for name in table1.dtype.names: 

2529 if not has_bigendian: 

2530 assert table1.dtype[name] == table2.dtype[name] 

2531 else: 

2532 # Only check type matches, force to little-endian. 

2533 assert table1.dtype[name].newbyteorder(">") == table2.dtype[name].newbyteorder(">") 

2534 assert np.all(table1 == table2) 

2535 

2536 

2537def _checkNumpyDictEquality(dict1, dict2): 

2538 """Check if two numpy dicts have the same columns/values. 

2539 

2540 Parameters 

2541 ---------- 

2542 dict1 : `dict` [`str`, `np.ndarray`] 

2543 dict2 : `dict` [`str`, `np.ndarray`] 

2544 """ 

2545 assert set(dict1.keys()) == set(dict2.keys()) 

2546 for name in dict1: 

2547 assert dict1[name].dtype == dict2[name].dtype 

2548 assert np.all(dict1[name] == dict2[name]) 

2549 

2550 

2551if __name__ == "__main__": 

2552 unittest.main()