Coverage for tests / test_parquet.py: 17%
1347 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Tests for ParquetFormatter.
30Tests in this module are disabled unless pandas and pyarrow are importable.
31"""
33import datetime
34import os
35import posixpath
36import shutil
37import unittest
38import uuid
40try:
41 import pyarrow as pa
42except ImportError:
43 pa = None
44try:
45 import astropy.table as atable
46 from astropy import units
47except ImportError:
48 atable = None
49try:
50 import numpy as np
51except ImportError:
52 np = None
53try:
54 import pandas as pd
55except ImportError:
56 pd = None
58try:
59 import boto3
60 import botocore
62 from lsst.resources.s3utils import clean_test_environment_for_s3
64 try:
65 from moto import mock_aws # v5
66 except ImportError:
67 from moto import mock_s3 as mock_aws
68except ImportError:
69 boto3 = None
71try:
72 import fsspec
73except ImportError:
74 fsspec = None
76try:
77 import s3fs
78except ImportError:
79 s3fs = None
82from lsst.daf.butler import (
83 Butler,
84 Config,
85 DatasetProvenance,
86 DatasetRef,
87 DatasetType,
88 FileDataset,
89 StorageClassConfig,
90 StorageClassFactory,
91)
92from lsst.resources import ResourcePath
94try:
95 from lsst.daf.butler.delegates.arrowtable import ArrowTableDelegate
96except ImportError:
97 pa = None
99try:
100 from lsst.daf.butler.formatters.parquet import (
101 ASTROPY_PANDAS_INDEX_KEY,
102 ArrowAstropySchema,
103 ArrowNumpySchema,
104 DataFrameSchema,
105 ParquetFormatter,
106 _append_numpy_multidim_metadata,
107 _astropy_to_numpy_dict,
108 _numpy_dict_to_numpy,
109 _numpy_dtype_to_arrow_types,
110 _numpy_style_arrays_to_arrow_arrays,
111 _numpy_to_numpy_dict,
112 add_pandas_index_to_astropy,
113 arrow_to_astropy,
114 arrow_to_numpy,
115 arrow_to_numpy_dict,
116 arrow_to_pandas,
117 astropy_to_arrow,
118 astropy_to_pandas,
119 compute_row_group_size,
120 numpy_dict_to_arrow,
121 numpy_to_arrow,
122 pandas_to_arrow,
123 pandas_to_astropy,
124 )
125except ImportError:
126 pa = None
127 pd = None
128 atable = None
129 np = None
130from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
132TESTDIR = os.path.abspath(os.path.dirname(__file__))
135def _makeSimpleNumpyTable(include_multidim=False, include_bigendian=False):
136 """Make a simple numpy table with random data.
138 Parameters
139 ----------
140 include_multidim : `bool`
141 Include multi-dimensional columns.
142 include_bigendian : `bool`
143 Include big-endian columns.
145 Returns
146 -------
147 numpyTable : `numpy.ndarray`
148 """
149 nrow = 5
151 dtype = [
152 ("index", "i4"),
153 ("a", "f8"),
154 ("b", "f8"),
155 ("c", "f8"),
156 ("ddd", "f8"),
157 ("f", "i8"),
158 ("strcol", "U10"),
159 ("bytecol", "S10"),
160 ("dtn", "datetime64[ns]"),
161 ("dtu", "datetime64[us]"),
162 ]
164 if include_multidim:
165 dtype.extend(
166 [
167 ("d1", "f4", (5,)),
168 ("d2", "i8", (5, 10)),
169 ("d3", "f8", (5, 10)),
170 ]
171 )
173 if include_bigendian:
174 dtype.extend([("a_bigendian", ">f8"), ("f_bigendian", ">i8")])
176 data = np.zeros(nrow, dtype=dtype)
177 data["index"][:] = np.arange(nrow)
178 data["a"] = np.random.randn(nrow)
179 data["b"] = np.random.randn(nrow)
180 data["c"] = np.random.randn(nrow)
181 data["ddd"] = np.random.randn(nrow)
182 data["f"] = np.arange(nrow) * 10
183 data["strcol"][:] = "teststring"
184 data["bytecol"][:] = "teststring"
185 data["dtn"] = datetime.datetime.fromisoformat("2024-07-23")
186 data["dtu"] = datetime.datetime.fromisoformat("2024-07-23")
188 if include_multidim:
189 data["d1"] = np.random.randn(data["d1"].size).reshape(data["d1"].shape)
190 data["d2"] = np.arange(data["d2"].size).reshape(data["d2"].shape)
191 data["d3"] = np.asfortranarray(np.random.randn(data["d3"].size).reshape(data["d3"].shape))
193 if include_bigendian:
194 data["a_bigendian"][:] = data["a"]
195 data["f_bigendian"][:] = data["f"]
197 return data
200def _makeSingleIndexDataFrame(include_masked=False, include_lists=False):
201 """Make a single index data frame for testing.
203 Parameters
204 ----------
205 include_masked : `bool`
206 Include masked columns.
207 include_lists : `bool`
208 Include list columns.
210 Returns
211 -------
212 dataFrame : `~pandas.DataFrame`
213 The test dataframe.
214 allColumns : `list` [`str`]
215 List of all the columns (including index columns).
216 """
217 data = _makeSimpleNumpyTable()
218 df = pd.DataFrame(data)
219 df = df.set_index("index")
221 if include_masked:
222 nrow = len(df)
224 df["m1"] = pd.array(np.arange(nrow), dtype=pd.Int64Dtype())
225 df["m2"] = pd.array(np.arange(nrow), dtype=np.float32)
226 df["mstrcol"] = pd.array(np.array(["text"] * nrow))
227 df.loc[1, ["m1", "m2", "mstrcol"]] = None
228 df.loc[0, "m1"] = 1649900760361600113
230 if include_lists:
231 nrow = len(df)
233 df["l1"] = [[0, 0]] * nrow
234 df["l2"] = [[0.0, 0.0]] * nrow
235 df["l3"] = [[]] * nrow
237 allColumns = df.columns.append(pd.Index(df.index.names))
239 return df, allColumns
242def _makeMultiIndexDataFrame():
243 """Make a multi-index data frame for testing.
245 Returns
246 -------
247 dataFrame : `~pandas.DataFrame`
248 The test dataframe.
249 """
250 columns = pd.MultiIndex.from_tuples(
251 [
252 ("g", "a"),
253 ("g", "b"),
254 ("g", "c"),
255 ("r", "a"),
256 ("r", "b"),
257 ("r", "c"),
258 ],
259 names=["filter", "column"],
260 )
261 df = pd.DataFrame(np.random.randn(5, 6), index=np.arange(5, dtype=int), columns=columns)
263 return df
266def _makeSimpleAstropyTable(include_multidim=False, include_masked=False, include_bigendian=False):
267 """Make an astropy table for testing.
269 Parameters
270 ----------
271 include_multidim : `bool`
272 Include multi-dimensional columns.
273 include_masked : `bool`
274 Include masked columns.
275 include_bigendian : `bool`
276 Include big-endian columns.
278 Returns
279 -------
280 astropyTable : `astropy.table.Table`
281 The test table.
282 """
283 data = _makeSimpleNumpyTable(include_multidim=include_multidim, include_bigendian=include_bigendian)
284 # Add a couple of units.
285 table = atable.Table(data)
286 table["a"].unit = units.degree
287 table["a"].description = "Description of column a"
288 table["b"].unit = units.meter
289 table["b"].description = "Description of column b"
291 # Add some masked columns.
292 if include_masked:
293 nrow = len(table)
294 mask = np.zeros(nrow, dtype=bool)
295 mask[1] = True
296 # We set the masked columns with the underlying sentinel value
297 # to be able test after serialization.
299 # Masked 64-bit integer.
300 arr = np.arange(nrow, dtype="i8")
301 arr[mask] = -1
302 arr[0] = 1649900760361600113
303 table["m_i8"] = np.ma.masked_array(data=arr, mask=mask, fill_value=-1)
304 # Masked 32-bit float.
305 arr = np.arange(nrow, dtype="f4")
306 arr[mask] = np.nan
307 table["m_f4"] = np.ma.masked_array(data=arr, mask=mask, fill_value=np.nan)
308 # Unmasked 32-bit float with NaNs.
309 table["um_f4"] = arr
310 # Masked 64-bit float.
311 arr = np.arange(nrow, dtype="f8")
312 arr[mask] = np.nan
313 table["m_f8"] = np.ma.masked_array(data=arr, mask=mask, fill_value=np.nan)
314 # Unmasked 64-bit float with NaNs.
315 table["um_f8"] = arr
316 # Masked boolean.
317 arr = np.zeros(nrow, dtype=np.bool_)
318 arr[mask] = True
319 table["m_bool"] = np.ma.masked_array(data=arr, mask=mask, fill_value=True)
320 # Masked unsigned 32-bit unsigned int.
321 arr = np.arange(nrow, dtype="u4")
322 arr[mask] = 0
323 table["m_u4"] = np.ma.masked_array(data=arr, mask=mask, fill_value=0)
324 # Masked string.
325 table["m_str"] = np.ma.masked_array(data=np.array(["text"] * nrow), mask=mask, fill_value="")
326 # Masked bytes.
327 table["m_byte"] = np.ma.masked_array(data=np.array([b"bytes"] * nrow), mask=mask, fill_value=b"")
329 return table
332def _makeSimpleArrowTable(include_multidim=False, include_masked=False):
333 """Make an arrow table for testing.
335 Parameters
336 ----------
337 include_multidim : `bool`
338 Include multi-dimensional columns.
339 include_masked : `bool`
340 Include masked columns.
342 Returns
343 -------
344 arrowTable : `pyarrow.Table`
345 The test table.
346 """
347 data = _makeSimpleAstropyTable(include_multidim=include_multidim, include_masked=include_masked)
348 return astropy_to_arrow(data)
351@unittest.skipUnless(pd is not None, "Cannot test ParquetFormatterDataFrame without pandas.")
352@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterDataFrame without pyarrow.")
353class ParquetFormatterDataFrameTestCase(unittest.TestCase):
354 """Tests for ParquetFormatter, DataFrame, using local file datastore."""
356 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
358 def setUp(self):
359 """Create a new butler root for each test."""
360 self.root = makeTestTempDir(TESTDIR)
361 config = Config(self.configFile)
362 self.run = "test_run"
363 self.butler = Butler.from_config(
364 Butler.makeRepo(self.root, config=config), writeable=True, run=self.run
365 )
366 self.enterContext(self.butler)
367 # No dimensions in dataset type so we don't have to worry about
368 # inserting dimension data or defining data IDs.
369 self.datasetType = DatasetType(
370 "data", dimensions=(), storageClass="DataFrame", universe=self.butler.dimensions
371 )
372 self.butler.registry.registerDatasetType(self.datasetType)
374 def tearDown(self):
375 removeTestTempDir(self.root)
377 def testSingleIndexDataFrame(self):
378 df1, allColumns = _makeSingleIndexDataFrame(include_masked=True)
380 self.butler.put(df1, self.datasetType, dataId={})
381 # Read the whole DataFrame.
382 df2 = self.butler.get(self.datasetType, dataId={})
383 self.assertTrue(df1.equals(df2))
384 # Read just the column descriptions.
385 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
386 self.assertTrue(allColumns.equals(columns2))
387 # Read the rowcount.
388 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
389 self.assertEqual(rowcount, len(df1))
390 # Read the schema.
391 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
392 self.assertEqual(schema, DataFrameSchema(df1))
393 # Read just some columns a few different ways.
394 df3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
395 self.assertTrue(df1.loc[:, ["a", "c"]].equals(df3))
396 df4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
397 self.assertTrue(df1.loc[:, ["a"]].equals(df4))
398 df5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
399 self.assertTrue(df1.loc[:, ["a"]].equals(df5))
400 df6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
401 self.assertTrue(df1.loc[:, ["ddd"]].equals(df6))
402 df7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
403 self.assertTrue(df1.loc[:, ["a"]].equals(df7))
404 df8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d*"]})
405 self.assertTrue(df1.loc[:, ["ddd", "dtn", "dtu"]].equals(df8))
406 df9 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d*", "d*"]})
407 self.assertTrue(df1.loc[:, ["ddd", "dtn", "dtu"]].equals(df9))
408 # Passing an unrecognized column should be a ValueError.
409 with self.assertRaises(ValueError):
410 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})
412 def testSingleIndexDataFrameWithLists(self):
413 df1, allColumns = _makeSingleIndexDataFrame(include_lists=True)
415 self.butler.put(df1, self.datasetType, dataId={})
416 # Read the whole DataFrame.
417 df2 = self.butler.get(self.datasetType, dataId={})
419 # We need to check the list columns specially because they go
420 # from lists to arrays.
421 for col in ["l1", "l2", "l3"]:
422 for i in range(len(df1)):
423 self.assertTrue(np.all(df2[col].values[i] == df1[col].values[i]))
425 def testMultiIndexDataFrame(self):
426 df1 = _makeMultiIndexDataFrame()
428 self.butler.put(df1, self.datasetType, dataId={})
429 # Read the whole DataFrame.
430 df2 = self.butler.get(self.datasetType, dataId={})
431 self.assertTrue(df1.equals(df2))
432 # Read just the column descriptions.
433 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
434 self.assertTrue(df1.columns.equals(columns2))
435 self.assertEqual(columns2.names, df1.columns.names)
436 # Read the rowcount.
437 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
438 self.assertEqual(rowcount, len(df1))
439 # Read the schema.
440 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
441 self.assertEqual(schema, DataFrameSchema(df1))
442 # Read just some columns a few different ways.
443 df3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": {"filter": "g"}})
444 self.assertTrue(df1.loc[:, ["g"]].equals(df3))
445 df4 = self.butler.get(
446 self.datasetType, dataId={}, parameters={"columns": {"filter": ["r"], "column": "a"}}
447 )
448 self.assertTrue(df1.loc[:, [("r", "a")]].equals(df4))
449 column_list = [("g", "a"), ("r", "c")]
450 df5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": column_list})
451 self.assertTrue(df1.loc[:, column_list].equals(df5))
452 column_dict = {"filter": "r", "column": ["a", "b"]}
453 df6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": column_dict})
454 self.assertTrue(df1.loc[:, [("r", "a"), ("r", "b")]].equals(df6))
455 # Passing an unrecognized column should be a ValueError.
456 with self.assertRaises(ValueError):
457 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d"]})
459 def testSingleIndexDataFrameEmptyString(self):
460 """Test persisting a single index dataframe with empty strings."""
461 df1, _ = _makeSingleIndexDataFrame()
463 # Set one of the strings to None
464 df1.at[1, "strcol"] = None
466 self.butler.put(df1, self.datasetType, dataId={})
467 # Read the whole DataFrame.
468 df2 = self.butler.get(self.datasetType, dataId={})
469 self.assertTrue(df1.equals(df2))
471 def testSingleIndexDataFrameAllEmptyStrings(self):
472 """Test persisting a single index dataframe with an empty string
473 column.
474 """
475 df1, _ = _makeSingleIndexDataFrame()
477 # Set all of the strings to None
478 df1.loc[0:, "strcol"] = None
480 self.butler.put(df1, self.datasetType, dataId={})
481 # Read the whole DataFrame.
482 df2 = self.butler.get(self.datasetType, dataId={})
483 self.assertTrue(df1.equals(df2))
485 def testLegacyDataFrame(self):
486 """Test writing a dataframe to parquet via pandas (without additional
487 metadata) and ensure that we can read it back with all the new
488 functionality.
489 """
490 df1, allColumns = _makeSingleIndexDataFrame()
492 fname = os.path.join(self.root, "test_dataframe.parq")
493 df1.to_parquet(fname)
495 legacy_type = DatasetType(
496 "legacy_dataframe",
497 dimensions=(),
498 storageClass="DataFrame",
499 universe=self.butler.dimensions,
500 )
501 self.butler.registry.registerDatasetType(legacy_type)
503 data_id = {}
504 ref = DatasetRef(legacy_type, data_id, run=self.run)
505 dataset = FileDataset(path=fname, refs=[ref], formatter=ParquetFormatter)
507 self.butler.ingest(dataset, transfer="copy")
509 self.butler.put(df1, self.datasetType, dataId={})
511 df2a = self.butler.get(self.datasetType, dataId={})
512 df2b = self.butler.get("legacy_dataframe", dataId={})
513 self.assertTrue(df2a.equals(df2b))
515 df3a = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a"]})
516 df3b = self.butler.get("legacy_dataframe", dataId={}, parameters={"columns": ["a"]})
517 self.assertTrue(df3a.equals(df3b))
519 columns2a = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
520 columns2b = self.butler.get("legacy_dataframe.columns", dataId={})
521 self.assertTrue(columns2a.equals(columns2b))
523 rowcount2a = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
524 rowcount2b = self.butler.get("legacy_dataframe.rowcount", dataId={})
525 self.assertEqual(rowcount2a, rowcount2b)
527 schema2a = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
528 schema2b = self.butler.get("legacy_dataframe.schema", dataId={})
529 self.assertEqual(schema2a, schema2b)
531 def testDataFrameSchema(self):
532 tab1 = _makeSimpleArrowTable()
534 schema = DataFrameSchema.from_arrow(tab1.schema)
536 self.assertIsInstance(schema.schema, pd.DataFrame)
537 self.assertEqual(repr(schema), repr(schema._schema))
538 self.assertNotEqual(schema, "not_a_schema")
539 self.assertEqual(schema, schema)
541 tab2 = _makeMultiIndexDataFrame()
542 schema2 = DataFrameSchema(tab2)
544 self.assertNotEqual(schema, schema2)
546 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
547 def testWriteSingleIndexDataFrameReadAsAstropyTable(self):
548 df1, allColumns = _makeSingleIndexDataFrame()
550 self.butler.put(df1, self.datasetType, dataId={})
552 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
554 tab2_df = tab2.to_pandas(index="index")
555 self.assertTrue(df1.equals(tab2_df))
557 # Check reading the columns.
558 columns = list(tab2.columns.keys())
559 columns2 = self.butler.get(
560 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
561 )
562 # We check the set because pandas reorders the columns.
563 self.assertEqual(set(columns2), set(columns))
565 # Check reading the schema.
566 schema = ArrowAstropySchema(tab2)
567 schema2 = self.butler.get(
568 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowAstropySchema"
569 )
571 # The string types are objectified by pandas, and the order
572 # will be changed because of pandas indexing.
573 self.assertEqual(len(schema2.schema.columns), len(schema.schema.columns))
574 for name in schema.schema.columns:
575 self.assertIn(name, schema2.schema.columns)
576 if schema2.schema[name].dtype != np.dtype("O"):
577 self.assertEqual(schema2.schema[name].dtype, schema.schema[name].dtype)
579 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
580 def testWriteSingleIndexDataFrameWithMaskedColsReadAsAstropyTable(self):
581 # We need to special-case the write-as-pandas read-as-astropy code
582 # with masks because pandas has multiple ways to use masked columns.
583 # (The string column mask handling in particular is frustratingly
584 # inconsistent.)
585 df1, allColumns = _makeSingleIndexDataFrame(include_masked=True)
587 self.butler.put(df1, self.datasetType, dataId={})
589 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
590 tab2_df = astropy_to_pandas(tab2, index="index")
592 self.assertTrue(df1.columns.equals(tab2_df.columns))
593 for name in tab2_df.columns:
594 col1 = df1[name]
595 col2 = tab2_df[name]
597 if col1.hasnans:
598 notNull = col1.notnull()
599 self.assertTrue(notNull.equals(col2.notnull()))
600 # Need to check value-by-value because column may
601 # be made of objects, depending on what pandas decides.
602 for index in notNull.values.nonzero()[0]:
603 self.assertEqual(col1[index], col2[index])
604 else:
605 self.assertTrue(col1.equals(col2))
607 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
608 def testWriteMultiIndexDataFrameReadAsAstropyTable(self):
609 df1 = _makeMultiIndexDataFrame()
611 self.butler.put(df1, self.datasetType, dataId={})
613 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
615 # This is an odd duck, it doesn't really round-trip.
616 # This test simply checks that it's readable, but definitely not
617 # recommended.
619 @unittest.skipUnless(atable is not None, "Cannot test writing as astropy without astropy.")
620 def testWriteAstropyTableWithMaskedColsReadAsSingleIndexDataFrame(self):
621 tab1 = _makeSimpleAstropyTable(include_masked=True)
623 self.butler.put(tab1, self.datasetType, dataId={})
625 tab2 = self.butler.get(self.datasetType, dataId={})
627 tab1_df = astropy_to_pandas(tab1)
628 self.assertTrue(tab1_df.equals(tab2))
630 tab2_astropy = pandas_to_astropy(tab2)
631 for col in tab1.dtype.names:
632 np.testing.assert_array_equal(tab2_astropy[col], tab1[col])
633 if isinstance(tab1[col], atable.column.MaskedColumn):
634 np.testing.assert_array_equal(tab2_astropy[col].mask, tab1[col].mask)
636 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
637 def testWriteSingleIndexDataFrameReadAsArrowTable(self):
638 df1, allColumns = _makeSingleIndexDataFrame()
640 self.butler.put(df1, self.datasetType, dataId={})
642 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable")
644 tab2_df = arrow_to_pandas(tab2)
645 self.assertTrue(df1.equals(tab2_df))
647 # Check reading the columns.
648 columns = list(tab2.schema.names)
649 columns2 = self.butler.get(
650 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
651 )
652 # We check the set because pandas reorders the columns.
653 self.assertEqual(set(columns), set(columns2))
655 # Override the component using a dataset type.
656 columnsType = self.datasetType.makeComponentDatasetType("columns").overrideStorageClass(
657 "ArrowColumnList"
658 )
659 self.assertEqual(columns2, self.butler.get(columnsType))
661 # Check getting a component while overriding the storage class via
662 # the dataset type. This overrides the parent storage class and then
663 # selects the component.
664 columnsType = self.datasetType.overrideStorageClass("ArrowAstropy").makeComponentDatasetType(
665 "columns"
666 )
667 self.assertEqual(columns2, self.butler.get(columnsType))
669 # Check reading the schema.
670 schema = tab2.schema
671 schema2 = self.butler.get(
672 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowSchema"
673 )
675 # These will not have the same metadata, nor will the string column
676 # information be maintained.
677 self.assertEqual(len(schema.names), len(schema2.names))
678 for name in schema.names:
679 if schema.field(name).type not in (pa.string(), pa.binary()):
680 self.assertEqual(schema.field(name).type, schema2.field(name).type)
682 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
683 def testWriteMultiIndexDataFrameReadAsArrowTable(self):
684 df1 = _makeMultiIndexDataFrame()
686 self.butler.put(df1, self.datasetType, dataId={})
688 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable")
690 tab2_df = arrow_to_pandas(tab2)
691 self.assertTrue(df1.equals(tab2_df))
693 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
694 def testWriteSingleIndexDataFrameReadAsNumpyTable(self):
695 df1, allColumns = _makeSingleIndexDataFrame()
697 self.butler.put(df1, self.datasetType, dataId={})
699 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy")
701 tab2_df = pd.DataFrame.from_records(tab2, index=["index"])
702 self.assertTrue(df1.equals(tab2_df))
704 # Check reading the columns.
705 columns = list(tab2.dtype.names)
706 columns2 = self.butler.get(
707 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
708 )
709 # We check the set because pandas reorders the columns.
710 self.assertEqual(set(columns2), set(columns))
712 # Check reading the schema.
713 schema = ArrowNumpySchema(tab2.dtype)
714 schema2 = self.butler.get(
715 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowNumpySchema"
716 )
718 # The string types will be objectified by pandas, and the order
719 # will be changed because of pandas indexing.
720 self.assertEqual(len(schema.schema.names), len(schema2.schema.names))
721 for name in schema.schema.names:
722 self.assertIn(name, schema2.schema.names)
723 self.assertEqual(schema2.schema[name].type, schema.schema[name].type)
725 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
726 def testWriteMultiIndexDataFrameReadAsNumpyTable(self):
727 df1 = _makeMultiIndexDataFrame()
729 self.butler.put(df1, self.datasetType, dataId={})
731 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy")
733 # This is an odd duck, it doesn't really round-trip.
734 # This test simply checks that it's readable, but definitely not
735 # recommended.
737 @unittest.skipUnless(np is not None, "Cannot test reading as numpy dict without numpy.")
738 def testWriteSingleIndexDataFrameReadAsNumpyDict(self):
739 df1, allColumns = _makeSingleIndexDataFrame()
741 self.butler.put(df1, self.datasetType, dataId={})
743 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict")
745 tab2_df = pd.DataFrame.from_records(tab2, index=["index"])
746 # The column order is not maintained.
747 self.assertEqual(set(df1.columns), set(tab2_df.columns))
748 for col in df1.columns:
749 self.assertTrue(np.all(df1[col].values == tab2_df[col].values))
751 @unittest.skipUnless(np is not None, "Cannot test reading as numpy dict without numpy.")
752 def testWriteMultiIndexDataFrameReadAsNumpyDict(self):
753 df1 = _makeMultiIndexDataFrame()
755 self.butler.put(df1, self.datasetType, dataId={})
757 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict")
759 # This is an odd duck, it doesn't really round-trip.
760 # This test simply checks that it's readable, but definitely not
761 # recommended.
763 def testBadDataFrameColumnParquet(self):
764 df1, allColumns = _makeSingleIndexDataFrame()
766 # Make a column with mixed type.
767 bad_col1 = [0.0] * len(df1)
768 bad_col1[1] = 0.0 * units.nJy
769 bad_df = df1.copy()
770 bad_df["bad_col1"] = bad_col1
772 # At the moment we cannot check that the correct note is added
773 # to the exception, but that will be possible in the future.
774 with self.assertRaises(RuntimeError):
775 self.butler.put(bad_df, self.datasetType, dataId={})
777 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
778 def testWriteReadAstropyTableLossless(self):
779 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True)
781 put_ref = self.butler.put(tab1, self.datasetType, dataId={})
783 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
785 # Check that minimal provenance was written by default.
786 expected = {
787 "LSST.BUTLER.ID": str(put_ref.id),
788 "LSST.BUTLER.RUN": "test_run",
789 "LSST.BUTLER.DATASETTYPE": "data",
790 "LSST.BUTLER.N_INPUTS": 0,
791 }
793 self.assertEqual(tab2.meta, expected)
795 _checkAstropyTableEquality(tab1, tab2)
797 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
798 def testWriteReadAstropyTableProvenance(self):
799 tab1 = _makeSimpleAstropyTable()
801 # Create a ref for provenance.
802 astropy_type = DatasetType(
803 "astropy_parquet",
804 dimensions=(),
805 storageClass="ArrowAstropy",
806 universe=self.butler.dimensions,
807 )
808 self.butler.registry.registerDatasetType(astropy_type)
809 input_ref = DatasetRef(astropy_type, {}, run="other_run")
810 quantum_id = uuid.uuid4()
811 provenance = DatasetProvenance(quantum_id=quantum_id)
812 provenance.add_input(input_ref)
814 put_ref = self.butler.put(tab1, self.datasetType, dataId={}, provenance=provenance)
816 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
818 expected = {
819 "LSST.BUTLER.ID": str(put_ref.id),
820 "LSST.BUTLER.RUN": "test_run",
821 "LSST.BUTLER.DATASETTYPE": "data",
822 "LSST.BUTLER.QUANTUM": str(quantum_id),
823 "LSST.BUTLER.N_INPUTS": 1,
824 "LSST.BUTLER.INPUT.0.ID": str(input_ref.id),
825 "LSST.BUTLER.INPUT.0.RUN": "other_run",
826 "LSST.BUTLER.INPUT.0.DATASETTYPE": "astropy_parquet",
827 }
828 self.assertEqual(tab2.meta, expected)
830 # Put the dataset again, with different provenance and ensure
831 # that the previous provenance was stripped.
832 self.butler.collections.register("new_run")
833 put_ref3 = self.butler.put(tab2, self.datasetType, dataId={}, run="new_run")
835 # tab2 will have been updated in place.
836 expected = {
837 "LSST.BUTLER.ID": str(put_ref3.id),
838 "LSST.BUTLER.RUN": "new_run",
839 "LSST.BUTLER.DATASETTYPE": "data",
840 "LSST.BUTLER.N_INPUTS": 0,
841 }
842 self.assertEqual(tab2.meta, expected)
843 null_prov, prov_ref = DatasetProvenance.from_flat_dict(tab2.meta, self.butler)
844 self.assertEqual(prov_ref, put_ref3)
845 self.assertEqual(null_prov, DatasetProvenance())
847 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
848 def testWriteReadNumpyTableLossless(self):
849 tab1 = _makeSimpleNumpyTable(include_multidim=True)
851 self.butler.put(tab1, self.datasetType, dataId={})
853 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy")
855 _checkNumpyTableEquality(tab1, tab2)
857 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
858 def testMaskedNumpy(self):
859 tab1 = _makeSimpleArrowTable(include_multidim=False, include_masked=True)
860 tab1_np = arrow_to_numpy(tab1)
861 self.assertIsInstance(tab1_np, np.ma.MaskedArray)
862 # Stats on a masked column should ignore the nan in row 1.
863 col = tab1_np["m_f8"]
864 self.assertEqual(np.mean(col), 2.25, f"Column: {col}")
866 # Now without a mask.
867 tab1 = _makeSimpleArrowTable(include_multidim=False, include_masked=False)
868 tab1_np = arrow_to_numpy(tab1)
869 self.assertNotIsInstance(tab1_np, np.ma.MaskedArray)
871 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
872 def testWriteReadArrowTableLossless(self):
873 tab1 = _makeSimpleArrowTable(include_multidim=False, include_masked=True)
875 self.butler.put(tab1, self.datasetType, dataId={})
877 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable")
879 self.assertEqual(tab1.schema, tab2.schema)
880 tab1_np = arrow_to_numpy(tab1)
881 tab2_np = arrow_to_numpy(tab2)
882 for col in tab1.column_names:
883 np.testing.assert_array_equal(tab2_np[col], tab1_np[col])
885 @unittest.skipUnless(np is not None, "Cannot test reading as numpy dict without numpy.")
886 def testWriteReadNumpyDictLossless(self):
887 tab1 = _makeSimpleNumpyTable(include_multidim=True)
888 dict1 = _numpy_to_numpy_dict(tab1)
890 self.butler.put(tab1, self.datasetType, dataId={})
892 dict2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict")
894 _checkNumpyDictEquality(dict1, dict2)
897@unittest.skipUnless(pd is not None, "Cannot test InMemoryDatastore with DataFrames without pandas.")
898class InMemoryDataFrameDelegateTestCase(ParquetFormatterDataFrameTestCase):
899 """Tests for InMemoryDatastore, using ArrowTableDelegate with Dataframe."""
901 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
903 def testBadDataFrameColumnParquet(self):
904 # This test does not raise for an in-memory datastore.
905 pass
907 def testWriteMultiIndexDataFrameReadAsAstropyTable(self):
908 df1 = _makeMultiIndexDataFrame()
910 self.butler.put(df1, self.datasetType, dataId={})
912 with self.assertRaises(ValueError):
913 _ = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
915 def testLegacyDataFrame(self):
916 # This test does not work with an inMemoryDatastore.
917 pass
919 def testBadInput(self):
920 df1, _ = _makeSingleIndexDataFrame()
921 delegate = ArrowTableDelegate("DataFrame")
923 with self.assertRaises(ValueError):
924 delegate.handleParameters(inMemoryDataset="not_a_dataframe")
926 with self.assertRaises(AttributeError):
927 delegate.getComponent(composite=df1, componentName="nothing")
929 def testStorageClass(self):
930 df1, allColumns = _makeSingleIndexDataFrame()
932 factory = StorageClassFactory()
933 factory.addFromConfig(StorageClassConfig())
935 storageClass = factory.findStorageClass(type(df1), compare_types=False)
936 # Force the name lookup to do name matching.
937 storageClass._pytype = None
938 self.assertEqual(storageClass.name, "DataFrame")
940 storageClass = factory.findStorageClass(type(df1), compare_types=True)
941 # Force the name lookup to do name matching.
942 storageClass._pytype = None
943 self.assertEqual(storageClass.name, "DataFrame")
946@unittest.skipUnless(atable is not None, "Cannot test ParquetFormatterArrowAstropy without astropy.")
947@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowAstropy without pyarrow.")
948class ParquetFormatterArrowAstropyTestCase(unittest.TestCase):
949 """Tests for ParquetFormatter, ArrowAstropy, using local file datastore."""
951 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
953 def setUp(self):
954 """Create a new butler root for each test."""
955 self.root = makeTestTempDir(TESTDIR)
956 config = Config(self.configFile)
957 self.run = "test_run"
958 self.butler = Butler.from_config(
959 Butler.makeRepo(self.root, config=config), writeable=True, run=self.run
960 )
961 self.enterContext(self.butler)
962 # No dimensions in dataset type so we don't have to worry about
963 # inserting dimension data or defining data IDs.
964 self.datasetType = DatasetType(
965 "data", dimensions=(), storageClass="ArrowAstropy", universe=self.butler.dimensions
966 )
967 self.butler.registry.registerDatasetType(self.datasetType)
969 def tearDown(self):
970 removeTestTempDir(self.root)
972 def testAstropyTable(self):
973 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True)
975 self.butler.put(tab1, self.datasetType, dataId={})
976 # Read the whole Table.
977 tab2 = self.butler.get(self.datasetType, dataId={})
978 _checkAstropyTableEquality(tab1, tab2)
979 # Read the columns.
980 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
981 self.assertEqual(len(columns2), len(tab1.dtype.names))
982 for i, name in enumerate(tab1.dtype.names):
983 self.assertEqual(columns2[i], name)
984 # Read the rowcount.
985 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
986 self.assertEqual(rowcount, len(tab1))
987 # Read the schema.
988 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
989 self.assertEqual(schema, ArrowAstropySchema(tab1))
990 # Read just some columns a few different ways.
991 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
992 _checkAstropyTableEquality(tab1[("a", "c")], tab3)
993 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
994 _checkAstropyTableEquality(tab1[("a",)], tab4)
995 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
996 _checkAstropyTableEquality(tab1[("index", "a")], tab5)
997 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
998 _checkAstropyTableEquality(tab1[("ddd",)], tab6)
999 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
1000 _checkAstropyTableEquality(tab1[("a",)], tab7)
1001 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??"]})
1002 _checkAstropyTableEquality(tab1[("ddd", "dtn", "dtu")], tab8)
1003 tab9 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??", "a*"]})
1004 _checkAstropyTableEquality(tab1[("ddd", "dtn", "dtu", "a")], tab9)
1005 # Passing an unrecognized column should be a ValueError.
1006 with self.assertRaises(ValueError):
1007 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})
1009 def testAstropyTableBigEndian(self):
1010 tab1 = _makeSimpleAstropyTable(include_bigendian=True)
1012 self.butler.put(tab1, self.datasetType, dataId={})
1013 # Read the whole Table.
1014 tab2 = self.butler.get(self.datasetType, dataId={})
1015 _checkAstropyTableEquality(tab1, tab2, has_bigendian=True)
1017 def testAstropyTableWithMetadata(self):
1018 tab1 = _makeSimpleAstropyTable(include_multidim=True)
1020 meta = {
1021 "meta_a": 5,
1022 "meta_b": 10.0,
1023 "meta_c": [1, 2, 3],
1024 "meta_d": True,
1025 "meta_e": "string",
1026 }
1028 tab1.meta.update(meta)
1030 self.butler.put(tab1, self.datasetType, dataId={})
1031 # Read the whole Table.
1032 tab2 = self.butler.get(self.datasetType, dataId={})
1033 # This will check that the metadata is equivalent as well.
1034 _checkAstropyTableEquality(tab1, tab2)
1036 def testArrowAstropySchema(self):
1037 tab1 = _makeSimpleAstropyTable()
1038 tab1_arrow = astropy_to_arrow(tab1)
1039 schema = ArrowAstropySchema.from_arrow(tab1_arrow.schema)
1041 self.assertIsInstance(schema.schema, atable.Table)
1042 self.assertEqual(repr(schema), repr(schema._schema))
1043 self.assertNotEqual(schema, "not_a_schema")
1044 self.assertEqual(schema, schema)
1046 # Test various inequalities
1047 tab2 = tab1.copy()
1048 tab2.rename_column("index", "index2")
1049 schema2 = ArrowAstropySchema(tab2)
1050 self.assertNotEqual(schema2, schema)
1052 tab2 = tab1.copy()
1053 tab2["index"].unit = units.micron
1054 schema2 = ArrowAstropySchema(tab2)
1055 self.assertNotEqual(schema2, schema)
1057 tab2 = tab1.copy()
1058 tab2["index"].description = "Index column"
1059 schema2 = ArrowAstropySchema(tab2)
1060 self.assertNotEqual(schema2, schema)
1062 tab2 = tab1.copy()
1063 tab2["index"].format = "%05d"
1064 schema2 = ArrowAstropySchema(tab2)
1065 self.assertNotEqual(schema2, schema)
1067 def testAstropyParquet(self):
1068 tab1 = _makeSimpleAstropyTable()
1070 # Remove datetime column which doesn't work with astropy currently.
1071 del tab1["dtn"]
1072 del tab1["dtu"]
1074 fname = os.path.join(self.root, "test_astropy.parq")
1075 tab1.write(fname)
1077 astropy_type = DatasetType(
1078 "astropy_parquet",
1079 dimensions=(),
1080 storageClass="ArrowAstropy",
1081 universe=self.butler.dimensions,
1082 )
1083 self.butler.registry.registerDatasetType(astropy_type)
1085 data_id = {}
1086 ref = DatasetRef(astropy_type, data_id, run=self.run)
1087 dataset = FileDataset(path=fname, refs=[ref], formatter=ParquetFormatter)
1089 self.butler.ingest(dataset, transfer="copy")
1091 self.butler.put(tab1, self.datasetType, dataId={})
1093 tab2a = self.butler.get(self.datasetType, dataId={})
1094 tab2b = self.butler.get("astropy_parquet", dataId={})
1095 _checkAstropyTableEquality(tab2a, tab2b)
1097 columns2a = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
1098 columns2b = self.butler.get("astropy_parquet.columns", dataId={})
1099 self.assertEqual(len(columns2b), len(columns2a))
1100 for i, name in enumerate(columns2a):
1101 self.assertEqual(columns2b[i], name)
1103 rowcount2a = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
1104 rowcount2b = self.butler.get("astropy_parquet.rowcount", dataId={})
1105 self.assertEqual(rowcount2a, rowcount2b)
1107 schema2a = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
1108 schema2b = self.butler.get("astropy_parquet.schema", dataId={})
1109 self.assertEqual(schema2a, schema2b)
1111 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
1112 def testWriteAstropyReadAsArrowTable(self):
1113 # This astropy <-> arrow works fine with masked columns.
1114 tab1 = _makeSimpleAstropyTable(include_masked=True)
1116 self.butler.put(tab1, self.datasetType, dataId={})
1118 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable")
1120 tab2_astropy = arrow_to_astropy(tab2)
1121 _checkAstropyTableEquality(tab1, tab2_astropy)
1123 # Check reading the columns.
1124 columns = tab2.schema.names
1125 columns2 = self.butler.get(
1126 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
1127 )
1128 self.assertEqual(columns2, columns)
1130 # Check reading the schema.
1131 schema = tab2.schema
1132 schema2 = self.butler.get(
1133 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowSchema"
1134 )
1136 self.assertEqual(schema, schema2)
1138 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.")
1139 def testWriteAstropyReadAsDataFrame(self):
1140 tab1 = _makeSimpleAstropyTable()
1142 self.butler.put(tab1, self.datasetType, dataId={})
1144 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
1146 # This is tricky because it loses the units and gains a bonus pandas
1147 # _index_ column, so we just test the dataframe form.
1149 tab1_df = tab1.to_pandas()
1150 self.assertTrue(tab1_df.equals(tab2))
1152 # Check reading the columns.
1153 columns = tab2.columns
1154 columns2 = self.butler.get(
1155 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex"
1156 )
1157 self.assertTrue(columns.equals(columns2))
1159 # Check reading the schema.
1160 schema = DataFrameSchema(tab2)
1161 schema2 = self.butler.get(
1162 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema"
1163 )
1165 self.assertEqual(schema2, schema)
1167 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.")
1168 def testWriteAstropyWithMaskedColsReadAsDataFrame(self):
1169 # We need to special-case the write-as-astropy read-as-pandas code
1170 # with masks because pandas has multiple ways to use masked columns.
1171 # (When writing an astropy table with masked columns we get an object
1172 # column back, but each unmasked element has the correct type.)
1173 tab1 = _makeSimpleAstropyTable(include_masked=True)
1175 self.butler.put(tab1, self.datasetType, dataId={})
1177 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
1179 tab1_df = astropy_to_pandas(tab1)
1181 self.assertTrue(tab1_df.columns.equals(tab2.columns))
1182 for name in tab2.columns:
1183 col1 = tab1_df[name]
1184 col2 = tab2[name]
1186 if col1.hasnans:
1187 notNull = col1.notnull()
1188 self.assertTrue(notNull.equals(col2.notnull()))
1189 # Need to check value-by-value because column may
1190 # be made of objects, depending on what pandas decides.
1191 for index in notNull.values.nonzero()[0]:
1192 self.assertEqual(col1[index], col2[index])
1193 else:
1194 self.assertTrue(col1.equals(col2))
1196 @unittest.skipUnless(pd is not None, "Cannot test writing as a dataframe without pandas.")
1197 def testWriteSingleIndexDataFrameWithMaskedColsReadAsAstropyTable(self):
1198 df1, allColumns = _makeSingleIndexDataFrame(include_masked=True)
1200 self.butler.put(df1, self.datasetType, dataId={})
1202 tab2 = self.butler.get(self.datasetType, dataId={})
1204 df1_tab = pandas_to_astropy(df1)
1206 _checkAstropyTableEquality(df1_tab, tab2)
1208 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
1209 def testWriteAstropyReadAsNumpyTable(self):
1210 tab1 = _makeSimpleAstropyTable()
1211 self.butler.put(tab1, self.datasetType, dataId={})
1213 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy")
1215 # This is tricky because it loses the units.
1216 tab2_astropy = atable.Table(tab2)
1218 _checkAstropyTableEquality(tab1, tab2_astropy, skip_units=True)
1220 # Check reading the columns.
1221 columns = list(tab2.dtype.names)
1222 columns2 = self.butler.get(
1223 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
1224 )
1225 self.assertEqual(columns2, columns)
1227 # Check reading the schema.
1228 schema = ArrowNumpySchema(tab2.dtype)
1229 schema2 = self.butler.get(
1230 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowNumpySchema"
1231 )
1233 self.assertEqual(schema2, schema)
1235 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
1236 def testWriteAstropyReadAsNumpyDict(self):
1237 tab1 = _makeSimpleAstropyTable()
1238 self.butler.put(tab1, self.datasetType, dataId={})
1240 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict")
1242 # This is tricky because it loses the units.
1243 tab2_astropy = atable.Table(tab2)
1245 _checkAstropyTableEquality(tab1, tab2_astropy, skip_units=True)
1247 def testBadAstropyColumnParquet(self):
1248 tab1 = _makeSimpleAstropyTable()
1250 # Make a column with mixed type.
1251 bad_col1 = [0.0] * len(tab1)
1252 bad_col1[1] = 0.0 * units.nJy
1253 bad_tab = tab1.copy()
1254 bad_tab["bad_col1"] = bad_col1
1256 # At the moment we cannot check that the correct note is added
1257 # to the exception, but that will be possible in the future.
1258 with self.assertRaises(RuntimeError):
1259 self.butler.put(bad_tab, self.datasetType, dataId={})
1261 # Make a column with ragged size.
1262 bad_col2 = [[0]] * len(tab1)
1263 bad_col2[1] = [0, 0]
1264 bad_tab = tab1.copy()
1265 bad_tab["bad_col2"] = bad_col2
1267 with self.assertRaises(RuntimeError):
1268 self.butler.put(bad_tab, self.datasetType, dataId={})
1270 @unittest.skipUnless(pd is not None, "Cannot test ParquetFormatterDataFrame without pandas.")
1271 def testWriteAstropyTableWithPandasIndexHint(self, testStrip=True):
1272 tab1 = _makeSimpleAstropyTable()
1274 add_pandas_index_to_astropy(tab1, "index")
1276 self.butler.put(tab1, self.datasetType, dataId={})
1278 # Read in as an astropy table and ensure index hint is still there.
1279 tab2 = self.butler.get(self.datasetType, dataId={})
1281 self.assertIn(ASTROPY_PANDAS_INDEX_KEY, tab2.meta)
1282 self.assertEqual(tab2.meta[ASTROPY_PANDAS_INDEX_KEY], "index")
1284 # Read as a dataframe and ensure index is set.
1285 df3 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
1287 self.assertEqual(df3.index.name, "index")
1289 # Read as a dataframe without naming the index column.
1290 with self.assertLogs(level="WARNING") as cm:
1291 _ = self.butler.get(
1292 self.datasetType,
1293 dataId={},
1294 storageClass="DataFrame",
1295 parameters={"columns": ["a", "b"]},
1296 )
1297 self.assertIn("Index column ``index``", cm.output[0])
1299 if testStrip:
1300 # Read as an astropy table without naming the index column.
1301 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "b"]})
1303 self.assertNotIn(ASTROPY_PANDAS_INDEX_KEY, tab5.meta)
1305 with self.assertRaises(ValueError):
1306 add_pandas_index_to_astropy(tab1, "not_a_column")
1309@unittest.skipUnless(atable is not None, "Cannot test InMemoryDatastore with AstropyTable without astropy.")
1310class InMemoryArrowAstropyDelegateTestCase(ParquetFormatterArrowAstropyTestCase):
1311 """Tests for InMemoryDatastore, using ArrowTableDelegate with
1312 AstropyTable.
1313 """
1315 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
1317 def testAstropyParquet(self):
1318 # This test does not work with an inMemoryDatastore.
1319 pass
1321 def testBadAstropyColumnParquet(self):
1322 # This test does not raise for an in-memory datastore.
1323 pass
1325 def testBadInput(self):
1326 tab1 = _makeSimpleAstropyTable()
1327 delegate = ArrowTableDelegate("ArrowAstropy")
1329 with self.assertRaises(ValueError):
1330 delegate.handleParameters(inMemoryDataset="not_an_astropy_table")
1332 with self.assertRaises(NotImplementedError):
1333 delegate.handleParameters(inMemoryDataset=tab1, parameters={"columns": [("a", "b")]})
1335 with self.assertRaises(AttributeError):
1336 delegate.getComponent(composite=tab1, componentName="nothing")
1338 @unittest.skipUnless(pd is not None, "Cannot test ParquetFormatterDataFrame without pandas.")
1339 def testWriteAstropyTableWithPandasIndexHint(self):
1340 super().testWriteAstropyTableWithPandasIndexHint(testStrip=False)
1343@unittest.skipUnless(np is not None, "Cannot test ParquetFormatterArrowNumpy without numpy.")
1344@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowNumpy without pyarrow.")
1345class ParquetFormatterArrowNumpyTestCase(unittest.TestCase):
1346 """Tests for ParquetFormatter, ArrowNumpy, using local file datastore."""
1348 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
1350 def setUp(self):
1351 """Create a new butler root for each test."""
1352 self.root = makeTestTempDir(TESTDIR)
1353 config = Config(self.configFile)
1354 self.butler = Butler.from_config(
1355 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run"
1356 )
1357 self.enterContext(self.butler)
1358 # No dimensions in dataset type so we don't have to worry about
1359 # inserting dimension data or defining data IDs.
1360 self.datasetType = DatasetType(
1361 "data", dimensions=(), storageClass="ArrowNumpy", universe=self.butler.dimensions
1362 )
1363 self.butler.registry.registerDatasetType(self.datasetType)
1365 def tearDown(self):
1366 removeTestTempDir(self.root)
1368 def testNumpyTable(self):
1369 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1371 self.butler.put(tab1, self.datasetType, dataId={})
1372 # Read the whole Table.
1373 tab2 = self.butler.get(self.datasetType, dataId={})
1374 _checkNumpyTableEquality(tab1, tab2)
1375 # Read the columns.
1376 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
1377 self.assertEqual(len(columns2), len(tab1.dtype.names))
1378 for i, name in enumerate(tab1.dtype.names):
1379 self.assertEqual(columns2[i], name)
1380 # Read the rowcount.
1381 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
1382 self.assertEqual(rowcount, len(tab1))
1383 # Read the schema.
1384 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
1385 self.assertEqual(schema, ArrowNumpySchema(tab1.dtype))
1386 # Read just some columns a few different ways.
1387 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
1388 _checkNumpyTableEquality(tab1[["a", "c"]], tab3)
1389 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
1390 _checkNumpyTableEquality(
1391 tab1[
1392 [
1393 "a",
1394 ]
1395 ],
1396 tab4,
1397 )
1398 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
1399 _checkNumpyTableEquality(tab1[["index", "a"]], tab5)
1400 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
1401 _checkNumpyTableEquality(
1402 tab1[
1403 [
1404 "ddd",
1405 ]
1406 ],
1407 tab6,
1408 )
1409 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
1410 _checkNumpyTableEquality(
1411 tab1[
1412 [
1413 "a",
1414 ]
1415 ],
1416 tab7,
1417 )
1418 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??", "a*"]})
1419 _checkNumpyTableEquality(
1420 tab1[
1421 [
1422 "ddd",
1423 "dtn",
1424 "dtu",
1425 "a",
1426 ]
1427 ],
1428 tab8,
1429 )
1430 # Passing an unrecognized column should be a ValueError.
1431 with self.assertRaises(ValueError):
1432 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})
1434 def testNumpyTableBigEndian(self):
1435 tab1 = _makeSimpleNumpyTable(include_bigendian=True)
1437 self.butler.put(tab1, self.datasetType, dataId={})
1438 # Read the whole Table.
1439 tab2 = self.butler.get(self.datasetType, dataId={})
1440 _checkNumpyTableEquality(tab1, tab2, has_bigendian=True)
1442 def testArrowNumpySchema(self):
1443 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1444 tab1_arrow = numpy_to_arrow(tab1)
1445 schema = ArrowNumpySchema.from_arrow(tab1_arrow.schema)
1447 self.assertIsInstance(schema.schema, np.dtype)
1448 self.assertEqual(repr(schema), repr(schema._dtype))
1449 self.assertNotEqual(schema, "not_a_schema")
1450 self.assertEqual(schema, schema)
1452 # Test inequality
1453 tab2 = tab1.copy()
1454 names = list(tab2.dtype.names)
1455 names[0] = "index2"
1456 tab2.dtype.names = names
1457 schema2 = ArrowNumpySchema(tab2.dtype)
1458 self.assertNotEqual(schema2, schema)
1460 @unittest.skipUnless(pa is not None, "Cannot test arrow conversions without pyarrow.")
1461 def testNumpyDictConversions(self):
1462 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1464 # Verify that everything round-trips, including the schema.
1465 tab1_arrow = numpy_to_arrow(tab1)
1466 tab1_dict = arrow_to_numpy_dict(tab1_arrow)
1467 tab1_dict_arrow = numpy_dict_to_arrow(tab1_dict)
1469 self.assertEqual(tab1_arrow.schema, tab1_dict_arrow.schema)
1470 self.assertEqual(tab1_arrow, tab1_dict_arrow)
1472 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
1473 def testWriteNumpyTableReadAsArrowTable(self):
1474 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1476 self.butler.put(tab1, self.datasetType, dataId={})
1478 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable")
1480 tab2_numpy = arrow_to_numpy(tab2)
1482 _checkNumpyTableEquality(tab1, tab2_numpy)
1484 # Check reading the columns.
1485 columns = tab2.schema.names
1486 columns2 = self.butler.get(
1487 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
1488 )
1489 self.assertEqual(columns2, columns)
1491 # Check reading the schema.
1492 schema = tab2.schema
1493 schema2 = self.butler.get(
1494 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowSchema"
1495 )
1496 self.assertEqual(schema2, schema)
1498 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.")
1499 def testWriteNumpyTableReadAsDataFrame(self):
1500 tab1 = _makeSimpleNumpyTable()
1502 self.butler.put(tab1, self.datasetType, dataId={})
1504 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
1506 # Converting this back to numpy gets confused with the index column
1507 # and changes the datatype of the string column.
1509 tab1_df = pd.DataFrame(tab1)
1511 self.assertTrue(tab1_df.equals(tab2))
1513 # Check reading the columns.
1514 columns = tab2.columns
1515 columns2 = self.butler.get(
1516 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex"
1517 )
1518 self.assertTrue(columns.equals(columns2))
1520 # Check reading the schema.
1521 schema = DataFrameSchema(tab2)
1522 schema2 = self.butler.get(
1523 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema"
1524 )
1526 self.assertEqual(schema2, schema)
1528 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
1529 def testWriteNumpyTableReadAsAstropyTable(self):
1530 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1532 self.butler.put(tab1, self.datasetType, dataId={})
1534 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
1535 tab2_numpy = tab2.as_array()
1537 _checkNumpyTableEquality(tab1, tab2_numpy)
1539 # Check reading the columns.
1540 columns = list(tab2.columns.keys())
1541 columns2 = self.butler.get(
1542 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
1543 )
1544 self.assertEqual(columns2, columns)
1546 # Check reading the schema.
1547 schema = ArrowAstropySchema(tab2)
1548 schema2 = self.butler.get(
1549 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowAstropySchema"
1550 )
1552 self.assertEqual(schema2, schema)
1554 def testWriteNumpyTableReadAsNumpyDict(self):
1555 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1557 self.butler.put(tab1, self.datasetType, dataId={})
1559 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict")
1560 tab2_numpy = _numpy_dict_to_numpy(tab2)
1562 _checkNumpyTableEquality(tab1, tab2_numpy)
1564 def testBadNumpyColumnParquet(self):
1565 tab1 = _makeSimpleAstropyTable()
1567 # Make a column with mixed type.
1568 bad_col1 = [0.0] * len(tab1)
1569 bad_col1[1] = 0.0 * units.nJy
1570 bad_tab = tab1.copy()
1571 bad_tab["bad_col1"] = bad_col1
1573 bad_tab_np = bad_tab.as_array()
1575 # At the moment we cannot check that the correct note is added
1576 # to the exception, but that will be possible in the future.
1577 with self.assertRaises(RuntimeError):
1578 self.butler.put(bad_tab_np, self.datasetType, dataId={})
1580 # Make a column with ragged size.
1581 bad_col2 = [[0]] * len(tab1)
1582 bad_col2[1] = [0, 0]
1583 bad_tab = tab1.copy()
1584 bad_tab["bad_col2"] = bad_col2
1586 bad_tab_np = bad_tab.as_array()
1588 with self.assertRaises(RuntimeError):
1589 self.butler.put(bad_tab_np, self.datasetType, dataId={})
1591 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
1592 def testWriteReadAstropyTableLossless(self):
1593 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True)
1595 self.butler.put(tab1, self.datasetType, dataId={})
1597 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
1599 _checkAstropyTableEquality(tab1, tab2)
1602@unittest.skipUnless(np is not None, "Cannot test ImMemoryDatastore with Numpy table without numpy.")
1603class InMemoryArrowNumpyDelegateTestCase(ParquetFormatterArrowNumpyTestCase):
1604 """Tests for InMemoryDatastore, using ArrowTableDelegate with
1605 Numpy table.
1606 """
1608 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
1610 def testBadNumpyColumnParquet(self):
1611 # This test does not raise for an in-memory datastore.
1612 pass
1614 def testBadInput(self):
1615 tab1 = _makeSimpleNumpyTable()
1616 delegate = ArrowTableDelegate("ArrowNumpy")
1618 with self.assertRaises(ValueError):
1619 delegate.handleParameters(inMemoryDataset="not_a_numpy_table")
1621 with self.assertRaises(NotImplementedError):
1622 delegate.handleParameters(inMemoryDataset=tab1, parameters={"columns": [("a", "b")]})
1624 with self.assertRaises(AttributeError):
1625 delegate.getComponent(composite=tab1, componentName="nothing")
1627 def testStorageClass(self):
1628 tab1 = _makeSimpleNumpyTable()
1630 factory = StorageClassFactory()
1631 factory.addFromConfig(StorageClassConfig())
1633 storageClass = factory.findStorageClass(type(tab1), compare_types=False)
1634 # Force the name lookup to do name matching.
1635 storageClass._pytype = None
1636 self.assertEqual(storageClass.name, "ArrowNumpy")
1638 storageClass = factory.findStorageClass(type(tab1), compare_types=True)
1639 # Force the name lookup to do name matching.
1640 storageClass._pytype = None
1641 self.assertEqual(storageClass.name, "ArrowNumpy")
1644@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowTable without pyarrow.")
1645class ParquetFormatterArrowTableTestCase(unittest.TestCase):
1646 """Tests for ParquetFormatter, ArrowTable, using local file datastore."""
1648 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
1650 def setUp(self):
1651 """Create a new butler root for each test."""
1652 self.root = makeTestTempDir(TESTDIR)
1653 config = Config(self.configFile)
1654 self.butler = Butler.from_config(
1655 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run"
1656 )
1657 self.enterContext(self.butler)
1658 # No dimensions in dataset type so we don't have to worry about
1659 # inserting dimension data or defining data IDs.
1660 self.datasetType = DatasetType(
1661 "data", dimensions=(), storageClass="ArrowTable", universe=self.butler.dimensions
1662 )
1663 self.butler.registry.registerDatasetType(self.datasetType)
1665 def tearDown(self):
1666 removeTestTempDir(self.root)
1668 def testArrowTable(self):
1669 tab1 = _makeSimpleArrowTable(include_multidim=True, include_masked=True)
1671 self.butler.put(tab1, self.datasetType, dataId={})
1672 # Read the whole Table.
1673 tab2 = self.butler.get(self.datasetType, dataId={})
1674 # We convert to use the numpy testing framework to handle nan
1675 # comparisons.
1676 self.assertEqual(tab1.schema, tab2.schema)
1677 tab1_np = arrow_to_numpy(tab1)
1678 tab2_np = arrow_to_numpy(tab2)
1679 for col in tab1.column_names:
1680 np.testing.assert_array_equal(tab2_np[col], tab1_np[col])
1681 # Read the columns.
1682 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
1683 self.assertEqual(len(columns2), len(tab1.schema.names))
1684 for i, name in enumerate(tab1.schema.names):
1685 self.assertEqual(columns2[i], name)
1686 # Read the rowcount.
1687 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
1688 self.assertEqual(rowcount, len(tab1))
1689 # Read the schema.
1690 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
1691 self.assertEqual(schema, tab1.schema)
1692 # Read just some columns a few different ways.
1693 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
1694 self.assertEqual(tab3, tab1.select(("a", "c")))
1695 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
1696 self.assertEqual(tab4, tab1.select(("a",)))
1697 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
1698 self.assertEqual(tab5, tab1.select(("index", "a")))
1699 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
1700 self.assertEqual(tab6, tab1.select(("ddd",)))
1701 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
1702 self.assertEqual(tab7, tab1.select(("a",)))
1703 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a*", "d??"]})
1704 self.assertEqual(tab8, tab1.select(("a", "ddd", "dtn", "dtu")))
1705 # Passing an unrecognized column should be a ValueError.
1706 with self.assertRaises(ValueError):
1707 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})
1709 def testEmptyArrowTable(self):
1710 data = _makeSimpleNumpyTable()
1711 type_list = _numpy_dtype_to_arrow_types(data.dtype)
1713 schema = pa.schema(type_list)
1714 arrays = [[]] * len(schema.names)
1716 tab1 = pa.Table.from_arrays(arrays, schema=schema)
1718 self.butler.put(tab1, self.datasetType, dataId={})
1719 tab2 = self.butler.get(self.datasetType, dataId={})
1720 self.assertEqual(tab2, tab1)
1722 tab1_numpy = arrow_to_numpy(tab1)
1723 self.assertEqual(len(tab1_numpy), 0)
1724 tab1_numpy_arrow = numpy_to_arrow(tab1_numpy)
1725 self.assertEqual(tab1_numpy_arrow, tab1)
1727 tab1_pandas = arrow_to_pandas(tab1)
1728 self.assertEqual(len(tab1_pandas), 0)
1729 tab1_pandas_arrow = pandas_to_arrow(tab1_pandas)
1730 # Unfortunately, string/byte columns get mangled when translated
1731 # through empty pandas dataframes.
1732 self.assertEqual(
1733 tab1_pandas_arrow.select(("index", "a", "b", "c", "ddd")),
1734 tab1.select(("index", "a", "b", "c", "ddd")),
1735 )
1737 tab1_astropy = arrow_to_astropy(tab1)
1738 self.assertEqual(len(tab1_astropy), 0)
1739 tab1_astropy_arrow = astropy_to_arrow(tab1_astropy)
1740 self.assertEqual(tab1_astropy_arrow, tab1)
1742 def testEmptyArrowTableMultidim(self):
1743 data = _makeSimpleNumpyTable(include_multidim=True)
1744 type_list = _numpy_dtype_to_arrow_types(data.dtype)
1746 md = {}
1747 for name in data.dtype.names:
1748 _append_numpy_multidim_metadata(md, name, data.dtype[name])
1750 schema = pa.schema(type_list, metadata=md)
1751 arrays = [[]] * len(schema.names)
1753 tab1 = pa.Table.from_arrays(arrays, schema=schema)
1755 self.butler.put(tab1, self.datasetType, dataId={})
1756 tab2 = self.butler.get(self.datasetType, dataId={})
1757 self.assertEqual(tab2, tab1)
1759 tab1_numpy = arrow_to_numpy(tab1)
1760 self.assertEqual(len(tab1_numpy), 0)
1761 tab1_numpy_arrow = numpy_to_arrow(tab1_numpy)
1762 self.assertEqual(tab1_numpy_arrow, tab1)
1764 tab1_astropy = arrow_to_astropy(tab1)
1765 self.assertEqual(len(tab1_astropy), 0)
1766 tab1_astropy_arrow = astropy_to_arrow(tab1_astropy)
1767 self.assertEqual(tab1_astropy_arrow, tab1)
1769 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.")
1770 def testWriteArrowTableReadAsSingleIndexDataFrame(self):
1771 df1, allColumns = _makeSingleIndexDataFrame()
1773 self.butler.put(df1, self.datasetType, dataId={})
1775 # Read back out as a dataframe.
1776 df2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
1777 self.assertTrue(df1.equals(df2))
1779 # Read back out as an arrow table, convert to dataframe.
1780 tab3 = self.butler.get(self.datasetType, dataId={})
1781 df3 = arrow_to_pandas(tab3)
1782 self.assertTrue(df1.equals(df3))
1784 # Check reading the columns.
1785 columns = df2.reset_index().columns
1786 columns2 = self.butler.get(
1787 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex"
1788 )
1789 # We check the set because pandas reorders the columns.
1790 self.assertEqual(set(columns2.to_list()), set(columns.to_list()))
1792 # Check reading the schema.
1793 schema = DataFrameSchema(df1)
1794 schema2 = self.butler.get(
1795 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema"
1796 )
1797 self.assertEqual(schema2, schema)
1799 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.")
1800 def testWriteArrowTableReadAsMultiIndexDataFrame(self):
1801 df1 = _makeMultiIndexDataFrame()
1803 self.butler.put(df1, self.datasetType, dataId={})
1805 # Read back out as a dataframe.
1806 df2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
1807 self.assertTrue(df1.equals(df2))
1809 # Read back out as an arrow table, convert to dataframe.
1810 atab3 = self.butler.get(self.datasetType, dataId={})
1811 df3 = arrow_to_pandas(atab3)
1812 self.assertTrue(df1.equals(df3))
1814 # Check reading the columns.
1815 columns = df2.columns
1816 columns2 = self.butler.get(
1817 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="DataFrameIndex"
1818 )
1819 self.assertTrue(columns2.equals(columns))
1821 # Check reading the schema.
1822 schema = DataFrameSchema(df1)
1823 schema2 = self.butler.get(
1824 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="DataFrameSchema"
1825 )
1826 self.assertEqual(schema2, schema)
1828 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
1829 def testWriteArrowTableReadAsAstropyTable(self):
1830 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True)
1832 self.butler.put(tab1, self.datasetType, dataId={})
1834 # Read back out as an astropy table.
1835 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
1836 _checkAstropyTableEquality(tab1, tab2)
1838 # Read back out as an arrow table, convert to astropy table.
1839 atab3 = self.butler.get(self.datasetType, dataId={})
1840 tab3 = arrow_to_astropy(atab3)
1841 _checkAstropyTableEquality(tab1, tab3)
1843 # Check reading the columns.
1844 columns = list(tab2.columns.keys())
1845 columns2 = self.butler.get(
1846 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
1847 )
1848 self.assertEqual(columns2, columns)
1850 # Check reading the schema.
1851 schema = ArrowAstropySchema(tab1)
1852 schema2 = self.butler.get(
1853 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowAstropySchema"
1854 )
1855 self.assertEqual(schema2, schema)
1857 # Check the schema conversions and units.
1858 arrow_schema = schema.to_arrow_schema()
1859 for name in arrow_schema.names:
1860 field_metadata = arrow_schema.field(name).metadata
1861 if (
1862 b"description" in field_metadata
1863 and (description := field_metadata[b"description"].decode("UTF-8")) != ""
1864 ):
1865 self.assertEqual(schema2.schema[name].description, description)
1866 else:
1867 self.assertIsNone(schema2.schema[name].description)
1868 if b"unit" in field_metadata and (unit := field_metadata[b"unit"].decode("UTF-8")) != "":
1869 self.assertEqual(schema2.schema[name].unit, units.Unit(unit))
1871 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
1872 def testWriteArrowTableReadAsNumpyTable(self):
1873 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1875 self.butler.put(tab1, self.datasetType, dataId={})
1877 # Read back out as a numpy table.
1878 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy")
1879 _checkNumpyTableEquality(tab1, tab2)
1881 # Read back out as an arrow table, convert to numpy table.
1882 atab3 = self.butler.get(self.datasetType, dataId={})
1883 tab3 = arrow_to_numpy(atab3)
1884 _checkNumpyTableEquality(tab1, tab3)
1886 # Check reading the columns.
1887 columns = list(tab2.dtype.names)
1888 columns2 = self.butler.get(
1889 self.datasetType.componentTypeName("columns"), dataId={}, storageClass="ArrowColumnList"
1890 )
1891 self.assertEqual(columns2, columns)
1893 # Check reading the schema.
1894 schema = ArrowNumpySchema(tab1.dtype)
1895 schema2 = self.butler.get(
1896 self.datasetType.componentTypeName("schema"), dataId={}, storageClass="ArrowNumpySchema"
1897 )
1898 self.assertEqual(schema2, schema)
1900 @unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
1901 def testWriteArrowTableReadAsNumpyDict(self):
1902 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1904 self.butler.put(tab1, self.datasetType, dataId={})
1906 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpyDict")
1907 tab2_numpy = _numpy_dict_to_numpy(tab2)
1908 _checkNumpyTableEquality(tab1, tab2_numpy)
1910 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
1911 def testWriteReadAstropyTableLossless(self):
1912 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True)
1914 self.butler.put(tab1, self.datasetType, dataId={})
1916 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
1918 _checkAstropyTableEquality(tab1, tab2)
1921@unittest.skipUnless(pa is not None, "Cannot test InMemoryDatastore with ArroWTable without pyarrow.")
1922class InMemoryArrowTableDelegateTestCase(ParquetFormatterArrowTableTestCase):
1923 """Tests for InMemoryDatastore, using ArrowTableDelegate."""
1925 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
1927 def testBadInput(self):
1928 tab1 = _makeSimpleArrowTable()
1929 delegate = ArrowTableDelegate("ArrowTable")
1931 with self.assertRaises(ValueError):
1932 delegate.handleParameters(inMemoryDataset="not_an_arrow_table")
1934 with self.assertRaises(NotImplementedError):
1935 delegate.handleParameters(inMemoryDataset=tab1, parameters={"columns": [("a", "b")]})
1937 with self.assertRaises(AttributeError):
1938 delegate.getComponent(composite=tab1, componentName="nothing")
1940 def testStorageClass(self):
1941 tab1 = _makeSimpleArrowTable()
1943 factory = StorageClassFactory()
1944 factory.addFromConfig(StorageClassConfig())
1946 storageClass = factory.findStorageClass(type(tab1), compare_types=False)
1947 # Force the name lookup to do name matching.
1948 storageClass._pytype = None
1949 self.assertEqual(storageClass.name, "ArrowTable")
1951 storageClass = factory.findStorageClass(type(tab1), compare_types=True)
1952 # Force the name lookup to do name matching.
1953 storageClass._pytype = None
1954 self.assertEqual(storageClass.name, "ArrowTable")
1957@unittest.skipUnless(np is not None, "Cannot test ParquetFormatterArrowNumpy without numpy.")
1958@unittest.skipUnless(pa is not None, "Cannot test ParquetFormatterArrowNumpy without pyarrow.")
1959class ParquetFormatterArrowNumpyDictTestCase(unittest.TestCase):
1960 """Tests for ParquetFormatter, ArrowNumpyDict, using local file store."""
1962 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
1964 def setUp(self):
1965 """Create a new butler root for each test."""
1966 self.root = makeTestTempDir(TESTDIR)
1967 config = Config(self.configFile)
1968 self.butler = Butler.from_config(
1969 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run"
1970 )
1971 self.enterContext(self.butler)
1972 # No dimensions in dataset type so we don't have to worry about
1973 # inserting dimension data or defining data IDs.
1974 self.datasetType = DatasetType(
1975 "data", dimensions=(), storageClass="ArrowNumpyDict", universe=self.butler.dimensions
1976 )
1977 self.butler.registry.registerDatasetType(self.datasetType)
1979 def tearDown(self):
1980 removeTestTempDir(self.root)
1982 def testNumpyDict(self):
1983 tab1 = _makeSimpleNumpyTable(include_multidim=True)
1984 dict1 = _numpy_to_numpy_dict(tab1)
1986 self.butler.put(dict1, self.datasetType, dataId={})
1987 # Read the whole table.
1988 dict2 = self.butler.get(self.datasetType, dataId={})
1989 _checkNumpyDictEquality(dict1, dict2)
1990 # Read the columns.
1991 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
1992 self.assertEqual(len(columns2), len(dict1.keys()))
1993 for name in dict1:
1994 self.assertIn(name, columns2)
1995 # Read the rowcount.
1996 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
1997 self.assertEqual(rowcount, len(dict1["a"]))
1998 # Read the schema.
1999 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
2000 self.assertEqual(schema, ArrowNumpySchema(tab1.dtype))
2001 # Read just some columns a few different ways.
2002 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
2003 subdict = {key: dict1[key] for key in ["a", "c"]}
2004 _checkNumpyDictEquality(subdict, tab3)
2005 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
2006 subdict = {key: dict1[key] for key in ["a"]}
2007 _checkNumpyDictEquality(subdict, tab4)
2008 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
2009 subdict = {key: dict1[key] for key in ["index", "a"]}
2010 _checkNumpyDictEquality(subdict, tab5)
2011 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
2012 subdict = {key: dict1[key] for key in ["ddd"]}
2013 _checkNumpyDictEquality(subdict, tab6)
2014 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
2015 subdict = {key: dict1[key] for key in ["a"]}
2016 _checkNumpyDictEquality(subdict, tab7)
2017 tab8 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d??", "a*"]})
2018 subdict = {key: dict1[key] for key in ["ddd", "dtn", "dtu", "a"]}
2019 _checkNumpyDictEquality(subdict, tab8)
2020 # Passing an unrecognized column should be a ValueError.
2021 with self.assertRaises(ValueError):
2022 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})
2024 @unittest.skipUnless(pa is not None, "Cannot test reading as arrow without pyarrow.")
2025 def testWriteNumpyDictReadAsArrowTable(self):
2026 tab1 = _makeSimpleNumpyTable(include_multidim=True)
2027 dict1 = _numpy_to_numpy_dict(tab1)
2029 self.butler.put(dict1, self.datasetType, dataId={})
2031 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowTable")
2033 tab2_dict = arrow_to_numpy_dict(tab2)
2035 _checkNumpyDictEquality(dict1, tab2_dict)
2037 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe without pandas.")
2038 def testWriteNumpyDictReadAsDataFrame(self):
2039 tab1 = _makeSimpleNumpyTable()
2040 dict1 = _numpy_to_numpy_dict(tab1)
2042 self.butler.put(dict1, self.datasetType, dataId={})
2044 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrame")
2046 # The order of the dict may get mixed up, so we need to check column
2047 # by column. We also need to do this in dataframe form because pandas
2048 # changes the datatype of the string column.
2049 tab1_df = pd.DataFrame(tab1)
2051 self.assertEqual(set(tab1_df.columns), set(tab2.columns))
2052 for col in tab1_df.columns:
2053 self.assertTrue(np.all(tab1_df[col].values == tab2[col].values))
2055 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
2056 def testWriteNumpyDictReadAsAstropyTable(self):
2057 tab1 = _makeSimpleNumpyTable(include_multidim=True)
2058 dict1 = _numpy_to_numpy_dict(tab1)
2060 self.butler.put(dict1, self.datasetType, dataId={})
2062 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
2063 tab2_dict = _astropy_to_numpy_dict(tab2)
2065 _checkNumpyDictEquality(dict1, tab2_dict)
2067 def testWriteNumpyDictReadAsNumpyTable(self):
2068 tab1 = _makeSimpleNumpyTable(include_multidim=True)
2069 dict1 = _numpy_to_numpy_dict(tab1)
2071 self.butler.put(dict1, self.datasetType, dataId={})
2073 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpy")
2074 tab2_dict = _numpy_to_numpy_dict(tab2)
2076 _checkNumpyDictEquality(dict1, tab2_dict)
2078 def testWriteNumpyDictBad(self):
2079 dict1 = {"a": 4, "b": np.ndarray([1])}
2080 with self.assertRaises(RuntimeError):
2081 self.butler.put(dict1, self.datasetType, dataId={})
2083 dict2 = {"a": np.zeros(4), "b": np.zeros(5)}
2084 with self.assertRaises(RuntimeError):
2085 self.butler.put(dict2, self.datasetType, dataId={})
2087 dict3 = {"a": [0] * 5, "b": np.zeros(5)}
2088 with self.assertRaises(RuntimeError):
2089 self.butler.put(dict3, self.datasetType, dataId={})
2091 dict4 = {"a": np.zeros(4), "b": np.zeros(4, dtype="O")}
2092 with self.assertRaises(RuntimeError):
2093 self.butler.put(dict4, self.datasetType, dataId={})
2095 @unittest.skipUnless(atable is not None, "Cannot test reading as astropy without astropy.")
2096 def testWriteReadAstropyTableLossless(self):
2097 tab1 = _makeSimpleAstropyTable(include_multidim=True, include_masked=True)
2099 self.butler.put(tab1, self.datasetType, dataId={})
2101 tab2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropy")
2103 _checkAstropyTableEquality(tab1, tab2)
2106@unittest.skipUnless(np is not None, "Cannot test InMemoryDatastore with NumpyDict without numpy.")
2107@unittest.skipUnless(pa is not None, "Cannot test InMemoryDatastore with NumpyDict without pyarrow.")
2108class InMemoryNumpyDictDelegateTestCase(ParquetFormatterArrowNumpyDictTestCase):
2109 """Tests for InMemoryDatastore, using ArrowTableDelegate with
2110 Numpy dict.
2111 """
2113 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
2115 def testWriteNumpyDictBad(self):
2116 # The sub-type checking is not done on in-memory datastore.
2117 pass
2120@unittest.skipUnless(pa is not None, "Cannot test ArrowSchema without pyarrow.")
2121class ParquetFormatterArrowSchemaTestCase(unittest.TestCase):
2122 """Tests for ParquetFormatter, ArrowSchema, using local file datastore."""
2124 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2126 def setUp(self):
2127 """Create a new butler root for each test."""
2128 self.root = makeTestTempDir(TESTDIR)
2129 config = Config(self.configFile)
2130 self.butler = Butler.from_config(
2131 Butler.makeRepo(self.root, config=config), writeable=True, run="test_run"
2132 )
2133 self.enterContext(self.butler)
2134 # No dimensions in dataset type so we don't have to worry about
2135 # inserting dimension data or defining data IDs.
2136 self.datasetType = DatasetType(
2137 "data", dimensions=(), storageClass="ArrowSchema", universe=self.butler.dimensions
2138 )
2139 self.butler.registry.registerDatasetType(self.datasetType)
2141 def tearDown(self):
2142 removeTestTempDir(self.root)
2144 def _makeTestSchema(self):
2145 schema = pa.schema(
2146 [
2147 pa.field(
2148 "int32",
2149 pa.int32(),
2150 nullable=False,
2151 metadata={
2152 "description": "32-bit integer",
2153 "unit": "",
2154 },
2155 ),
2156 pa.field(
2157 "int64",
2158 pa.int64(),
2159 nullable=False,
2160 metadata={
2161 "description": "64-bit integer",
2162 "unit": "",
2163 },
2164 ),
2165 pa.field(
2166 "uint64",
2167 pa.uint64(),
2168 nullable=False,
2169 metadata={
2170 "description": "64-bit unsigned integer",
2171 "unit": "",
2172 },
2173 ),
2174 pa.field(
2175 "float32",
2176 pa.float32(),
2177 nullable=False,
2178 metadata={
2179 "description": "32-bit float",
2180 "unit": "count",
2181 },
2182 ),
2183 pa.field(
2184 "float64",
2185 pa.float64(),
2186 nullable=False,
2187 metadata={
2188 "description": "64-bit float",
2189 "unit": "nJy",
2190 },
2191 ),
2192 pa.field(
2193 "fixed_size_list",
2194 pa.list_(pa.float64(), list_size=10),
2195 nullable=False,
2196 metadata={
2197 "description": "Fixed size list of 64-bit floats.",
2198 "unit": "nJy",
2199 },
2200 ),
2201 pa.field(
2202 "variable_size_list",
2203 pa.list_(pa.float64()),
2204 nullable=False,
2205 metadata={
2206 "description": "Variable size list of 64-bit floats.",
2207 "unit": "nJy",
2208 },
2209 ),
2210 # One of these fields will have no description.
2211 pa.field(
2212 "string",
2213 pa.string(),
2214 nullable=False,
2215 metadata={
2216 "unit": "",
2217 },
2218 ),
2219 # One of these fields will have no metadata.
2220 pa.field(
2221 "binary",
2222 pa.binary(),
2223 nullable=False,
2224 ),
2225 ]
2226 )
2228 return schema
2230 def testArrowSchema(self):
2231 schema1 = self._makeTestSchema()
2232 self.butler.put(schema1, self.datasetType, dataId={})
2234 schema2 = self.butler.get(self.datasetType, dataId={})
2235 self.assertEqual(schema2, schema1)
2237 @unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe schema without pandas.")
2238 def testWriteArrowSchemaReadAsDataFrameSchema(self):
2239 schema1 = self._makeTestSchema()
2240 self.butler.put(schema1, self.datasetType, dataId={})
2242 df_schema1 = DataFrameSchema.from_arrow(schema1)
2244 df_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrameSchema")
2245 self.assertEqual(df_schema2, df_schema1)
2247 @unittest.skipUnless(atable is not None, "Cannot test reading as an astropy schema without astropy.")
2248 def testWriteArrowSchemaReadAsArrowAstropySchema(self):
2249 schema1 = self._makeTestSchema()
2250 self.butler.put(schema1, self.datasetType, dataId={})
2252 ap_schema1 = ArrowAstropySchema.from_arrow(schema1)
2254 ap_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropySchema")
2255 self.assertEqual(ap_schema2, ap_schema1)
2257 # Confirm that the ap_schema2 has the unit/description we expect.
2258 for name in schema1.names:
2259 field_metadata = schema1.field(name).metadata
2260 if field_metadata is None:
2261 continue
2262 if (
2263 b"description" in field_metadata
2264 and (description := field_metadata[b"description"].decode("UTF-8")) != ""
2265 ):
2266 self.assertEqual(ap_schema2.schema[name].description, description)
2267 else:
2268 self.assertIsNone(ap_schema2.schema[name].description)
2269 if b"unit" in field_metadata and (unit := field_metadata[b"unit"].decode("UTF-8")) != "":
2270 self.assertEqual(ap_schema2.schema[name].unit, units.Unit(unit))
2272 @unittest.skipUnless(atable is not None, "Cannot test reading as an numpy schema without numpy.")
2273 def testWriteArrowSchemaReadAsArrowNumpySchema(self):
2274 schema1 = self._makeTestSchema()
2275 self.butler.put(schema1, self.datasetType, dataId={})
2277 np_schema1 = ArrowNumpySchema.from_arrow(schema1)
2279 np_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpySchema")
2280 self.assertEqual(np_schema2, np_schema1)
2283@unittest.skipUnless(pa is not None, "Cannot test InMemoryDatastore with ArrowSchema without pyarrow.")
2284class InMemoryArrowSchemaDelegateTestCase(ParquetFormatterArrowSchemaTestCase):
2285 """Tests for InMemoryDatastore and ArrowSchema."""
2287 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
2290@unittest.skipUnless(pa is not None, "Cannot test S3 without pyarrow.")
2291@unittest.skipUnless(boto3 is not None, "Cannot test S3 without boto3.")
2292@unittest.skipUnless(fsspec is not None, "Cannot test S3 without fsspec.")
2293@unittest.skipUnless(s3fs is not None, "Cannot test S3 without s3fs.")
2294class ParquetFormatterArrowTableS3TestCase(unittest.TestCase):
2295 """Tests for arrow table/parquet with S3."""
2297 # Code is adapted from test_butler.py
2298 configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml")
2299 fullConfigKey = None
2300 validationCanFail = True
2302 bucketName = "anybucketname"
2304 root = "butlerRoot/"
2306 datastoreStr = [f"datastore={root}"]
2308 datastoreName = ["FileDatastore@s3://{bucketName}/{root}"]
2310 registryStr = "/gen3.sqlite3"
2312 mock_aws = mock_aws()
2314 def setUp(self):
2315 self.root = makeTestTempDir(TESTDIR)
2317 config = Config(self.configFile)
2318 uri = ResourcePath(config[".datastore.datastore.root"])
2319 self.bucketName = uri.netloc
2321 # Enable S3 mocking of tests.
2322 self.enterContext(clean_test_environment_for_s3())
2323 self.mock_aws.start()
2325 rooturi = f"s3://{self.bucketName}/{self.root}"
2326 config.update({"datastore": {"datastore": {"root": rooturi}}})
2328 # need local folder to store registry database
2329 self.reg_dir = makeTestTempDir(TESTDIR)
2330 config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3"
2332 # MOTO needs to know that we expect Bucket bucketname to exist
2333 # (this used to be the class attribute bucketName)
2334 s3 = boto3.resource("s3")
2335 s3.create_bucket(Bucket=self.bucketName)
2337 self.datastoreStr = [f"datastore='{rooturi}'"]
2338 self.datastoreName = [f"FileDatastore@{rooturi}"]
2339 Butler.makeRepo(rooturi, config=config, forceConfigRoot=False)
2340 self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml")
2342 self.butler = Butler(self.tmpConfigFile, writeable=True, run="test_run")
2343 self.enterContext(self.butler)
2345 # No dimensions in dataset type so we don't have to worry about
2346 # inserting dimension data or defining data IDs.
2347 self.datasetType = DatasetType(
2348 "data", dimensions=(), storageClass="ArrowTable", universe=self.butler.dimensions
2349 )
2350 self.butler.registry.registerDatasetType(self.datasetType)
2352 def tearDown(self):
2353 s3 = boto3.resource("s3")
2354 bucket = s3.Bucket(self.bucketName)
2355 try:
2356 bucket.objects.all().delete()
2357 except botocore.exceptions.ClientError as e:
2358 if e.response["Error"]["Code"] == "404":
2359 # the key was not reachable - pass
2360 pass
2361 else:
2362 raise
2364 bucket = s3.Bucket(self.bucketName)
2365 bucket.delete()
2367 # Stop the S3 mock.
2368 self.mock_aws.stop()
2370 if self.reg_dir is not None and os.path.exists(self.reg_dir):
2371 shutil.rmtree(self.reg_dir, ignore_errors=True)
2373 if os.path.exists(self.root):
2374 shutil.rmtree(self.root, ignore_errors=True)
2376 def testArrowTableS3(self):
2377 tab1 = _makeSimpleArrowTable(include_multidim=True, include_masked=True)
2379 self.butler.put(tab1, self.datasetType, dataId={})
2381 # Read the whole Table.
2382 tab2 = self.butler.get(self.datasetType, dataId={})
2383 # We convert to use the numpy testing framework to handle nan
2384 # comparisons.
2385 self.assertEqual(tab1.schema, tab2.schema)
2386 tab1_np = arrow_to_numpy(tab1)
2387 tab2_np = arrow_to_numpy(tab2)
2388 for col in tab1.column_names:
2389 np.testing.assert_array_equal(tab2_np[col], tab1_np[col])
2390 # Read the columns.
2391 columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
2392 self.assertEqual(len(columns2), len(tab1.schema.names))
2393 for i, name in enumerate(tab1.schema.names):
2394 self.assertEqual(columns2[i], name)
2395 # Read the rowcount.
2396 rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
2397 self.assertEqual(rowcount, len(tab1))
2398 # Read the schema.
2399 schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
2400 self.assertEqual(schema, tab1.schema)
2401 # Read just some columns a few different ways.
2402 tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
2403 self.assertEqual(tab3, tab1.select(("a", "c")))
2404 tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
2405 self.assertEqual(tab4, tab1.select(("a",)))
2406 tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
2407 self.assertEqual(tab5, tab1.select(("index", "a")))
2408 tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
2409 self.assertEqual(tab6, tab1.select(("ddd",)))
2410 tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
2411 self.assertEqual(tab7, tab1.select(("a",)))
2412 # Passing an unrecognized column should be a ValueError.
2413 with self.assertRaises(ValueError):
2414 self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})
2417@unittest.skipUnless(np is not None, "Cannot test compute_row_group_size without numpy.")
2418@unittest.skipUnless(pa is not None, "Cannot test compute_row_group_size without pyarrow.")
2419class ComputeRowGroupSizeTestCase(unittest.TestCase):
2420 """Tests for compute_row_group_size."""
2422 def testRowGroupSizeNoMetadata(self):
2423 numpyTable = _makeSimpleNumpyTable(include_multidim=True)
2425 # We can't use the numpy_to_arrow convenience function because
2426 # that adds metadata.
2427 type_list = _numpy_dtype_to_arrow_types(numpyTable.dtype)
2428 schema = pa.schema(type_list)
2429 arrays = _numpy_style_arrays_to_arrow_arrays(
2430 numpyTable.dtype,
2431 len(numpyTable),
2432 numpyTable,
2433 schema,
2434 )
2435 arrowTable = pa.Table.from_arrays(arrays, schema=schema)
2437 row_group_size = compute_row_group_size(arrowTable.schema)
2439 self.assertGreater(row_group_size, 1_000_000)
2440 self.assertLess(row_group_size, 2_000_000)
2442 def testRowGroupSizeWithMetadata(self):
2443 numpyTable = _makeSimpleNumpyTable(include_multidim=True)
2445 arrowTable = numpy_to_arrow(numpyTable)
2447 row_group_size = compute_row_group_size(arrowTable.schema)
2449 self.assertGreater(row_group_size, 1_000_000)
2450 self.assertLess(row_group_size, 2_000_000)
2452 def testRowGroupSizeTinyTable(self):
2453 numpyTable = np.zeros(1, dtype=[("a", np.bool_)])
2455 arrowTable = numpy_to_arrow(numpyTable)
2457 row_group_size = compute_row_group_size(arrowTable.schema)
2459 self.assertGreater(row_group_size, 1_000_000)
2461 @unittest.skipUnless(pd is not None, "Cannot run testRowGroupSizeDataFrameWithLists without pandas.")
2462 def testRowGroupSizeDataFrameWithLists(self):
2463 df = pd.DataFrame({"a": np.zeros(10), "b": [[0, 0]] * 10, "c": [[0.0, 0.0]] * 10, "d": [[]] * 10})
2464 arrowTable = pandas_to_arrow(df)
2465 row_group_size = compute_row_group_size(arrowTable.schema)
2467 self.assertGreater(row_group_size, 1_000_000)
2470def _checkAstropyTableEquality(table1, table2, skip_units=False, has_bigendian=False):
2471 """Check if two astropy tables have the same columns/values.
2473 Parameters
2474 ----------
2475 table1 : `astropy.table.Table`
2476 table2 : `astropy.table.Table`
2477 skip_units : `bool`
2478 has_bigendian : `bool`
2479 """
2480 if not has_bigendian:
2481 assert table1.dtype == table2.dtype
2482 else:
2483 for name in table1.dtype.names:
2484 # Only check type matches, force to little-endian.
2485 assert table1.dtype[name].newbyteorder(">") == table2.dtype[name].newbyteorder(">")
2487 # Strip provenance before comparison.
2488 DatasetProvenance.strip_provenance_from_flat_dict(table1.meta)
2489 DatasetProvenance.strip_provenance_from_flat_dict(table2.meta)
2490 assert table1.meta == table2.meta
2491 if not skip_units:
2492 for name in table1.columns:
2493 assert table1[name].unit == table2[name].unit
2494 assert table1[name].description == table2[name].description
2495 assert table1[name].format == table2[name].format
2497 for name in table1.columns:
2498 # We need to check masked/regular columns after filling.
2499 has_masked = False
2500 if isinstance(table1[name], atable.column.MaskedColumn):
2501 c1 = table1[name].filled()
2502 has_masked = True
2503 else:
2504 c1 = np.array(table1[name])
2505 if has_masked:
2506 assert isinstance(table2[name], atable.column.MaskedColumn)
2507 c2 = table2[name].filled()
2508 else:
2509 assert not isinstance(table2[name], atable.column.MaskedColumn)
2510 c2 = np.array(table2[name])
2511 np.testing.assert_array_equal(c1, c2)
2512 # If we have a masked column then we test the underlying data.
2513 if has_masked:
2514 np.testing.assert_array_equal(np.array(c1), np.array(c2))
2515 np.testing.assert_array_equal(table1[name].mask, table2[name].mask)
2518def _checkNumpyTableEquality(table1, table2, has_bigendian=False):
2519 """Check if two numpy tables have the same columns/values
2521 Parameters
2522 ----------
2523 table1 : `numpy.ndarray`
2524 table2 : `numpy.ndarray`
2525 has_bigendian : `bool`
2526 """
2527 assert table1.dtype.names == table2.dtype.names
2528 for name in table1.dtype.names:
2529 if not has_bigendian:
2530 assert table1.dtype[name] == table2.dtype[name]
2531 else:
2532 # Only check type matches, force to little-endian.
2533 assert table1.dtype[name].newbyteorder(">") == table2.dtype[name].newbyteorder(">")
2534 assert np.all(table1 == table2)
2537def _checkNumpyDictEquality(dict1, dict2):
2538 """Check if two numpy dicts have the same columns/values.
2540 Parameters
2541 ----------
2542 dict1 : `dict` [`str`, `np.ndarray`]
2543 dict2 : `dict` [`str`, `np.ndarray`]
2544 """
2545 assert set(dict1.keys()) == set(dict2.keys())
2546 for name in dict1:
2547 assert dict1[name].dtype == dict2[name].dtype
2548 assert np.all(dict1[name] == dict2[name])
2551if __name__ == "__main__":
2552 unittest.main()