dask: Inconsistent Schema with partitioned pyarrow files

When writing partitoned Datasets to parquet files, and it so happens that there are is a partition where one column has no values, inconsistent schemas are written to the parquet file so that reading is prevented later:

import dask as da
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
import shutil

PATH_DASK = '/tmp/dask.pa'

if os.path.exists(PATH_DASK):
    shutil.rmtree(PATH_DASK)

arrays = np.array([np.array([0, 1, 2]), np.array([3, 4]), np.nan, np.nan])
strings = np.array(['a', 'b', np.nan, np.nan])

df = pd.DataFrame([0, 0, 1, 1], columns=['partition_column'])
df['arrays'] = pd.Series(arrays)
df['strings'] = pd.Series(strings)

da_df = dd.from_pandas(df, npartitions=2)
da_df.to_parquet(PATH_DASK, partition_on=['partition_column'])
dd.read_parquet(PATH_DASK)

Fails with:

ValueError: Schema in partition[partition_column=1] /tmp/dask.pa/partition_column=1/4cd3552267d84d66a5ec8764f869a13b.parquet was different. 
arrays: null
strings: string
__index_level_0__: int64
metadata
--------
...

vs

arrays: list<item: int64>
  child 0, item: int64
strings: string
__index_level_0__: int64
metadata
--------

I found #3454 relating to https://issues.apache.org/jira/browse/ARROW-2860. However the Pyarrow Issue only relates to reading files with different schemas. However Pyarrow in it self doesn’t even suffer this Bug, as Pyarrow itself would’t even write those different schemas in the first place:

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import os
import shutil

PATH_PYARROW = '/tmp/pyarrow.pa'

if os.path.exists(PATH_PYARROW):
    shutil.rmtree(PATH_PYARROW)

arrays = np.array([np.array([0, 1, 2]), np.array([3, 4]), np.nan, np.nan])
strings = np.array([np.nan, np.nan, 'a', 'b'])

df = pd.DataFrame([0, 0, 1, 1], columns=['partition_column'])
df['arrays'] = pd.Series(arrays)
df['strings'] = pd.Series(strings)

table = pa.Table.from_pandas(df)

pq.write_to_dataset(table, root_path=PATH_PYARROW, partition_cols=['partition_column'])

dataset = pq.ParquetDataset(PATH_PYARROW)
df_read = dataset.read().to_pandas()
df_read

Works perfectly fine. Even reading the Pyarrow Files with Dask now works perfectly fine: dd.read_parquet(PATH_PYARROW).compute()

This is simply due to the reason that Pyarrow still uses the same schema accross partitions even if ther is missing data:

import os
files = [val for sublist in [[os.path.join(i[0], j) for j in i[2]] for i in os.walk(PATH_PYARROW)] for val in sublist]
for file in files:
    dataset_part = pq.ParquetDataset(file)    
    print(file)
    print(dataset_part.schema)
    print(dataset_part.read().to_pandas())

Outputs:

/tmp/pyarrow.pa/partition_column=0/f32bf0bfedcc4312829e7c4371e544e6.parquet
<pyarrow._parquet.ParquetSchema object at 0x7fdd1c103eb8>
arrays.list.item: INT64
strings: BYTE_ARRAY UTF8
__index_level_0__: INT64
 
      arrays strings
0  [0, 1, 2]     nan
1     [3, 4]     nan
/tmp/pyarrow.pa/partition_column=1/044dd125e0b5472db23bff9a954a1bff.parquet
<pyarrow._parquet.ParquetSchema object at 0x7fdd1c1034e0>
arrays.list.item: INT64
strings: BYTE_ARRAY UTF8
__index_level_0__: INT64
 
  arrays strings
2   None       a
3   None       b

So summing it up: In Pyarrow the pyarrow.parquet.write_to_dataset wrapper around pyarrow.parquet.write_table takes care that the schema in individual files doesn’t get screwed up. Dask blindly uses pyarrow.parquet.write_table on each partition and hence ends up with a wrong schema.

This means that even if https://issues.apache.org/jira/browse/ARROW-2860 is fixed, loading of these wrong schemas might work, but they are still written wrongly to the file in the first place.

I believe that fixing this in dask is probably going to be hard but might still be worth a try. Maybe one simple but not ideal solution would be to to stick to the schema the first written partition produces and raise an error otherwise. At least we should make sure, that after all partitions are written to file, the schemas match. Otherwise people will save their files without an error raised and notice later on that opening is no longer possible.

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 48 (43 by maintainers)

Most upvoted comments

I’ve just run into the same problem. I understand that I can use pyarrow.parquet.write_to_dataset as a workaround, right? But I would like to do it properly using dask API. Is someone working in this issue? Can I help?

I was having this problem too and the following, not very elegant, patch fixes it for my data:

From 396492823960544fbf7632450562908d516df069 Mon Sep 17 00:00:00 2001
From: Sarah Bird <birdsarah@mozilla.com>
Date: Sat, 20 Apr 2019 03:12:51 -0500
Subject: [PATCH 1/1] First hack to pass in schema

---
 dask/dataframe/io/parquet.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/dask/dataframe/io/parquet.py b/dask/dataframe/io/parquet.py
index 81e2a846..8015dba9 100644
--- a/dask/dataframe/io/parquet.py
+++ b/dask/dataframe/io/parquet.py
@@ -958,7 +958,7 @@ def _read_pyarrow_parquet_piece(fs, piece, columns, index_cols, is_series,
 
 _pyarrow_write_table_kwargs = {'row_group_size', 'version', 'use_dictionary',
                                'compression', 'use_deprecated_int96_timestamps',
-                               'coerce_timestamps', 'flavor', 'chunk_size'}
+                               'coerce_timestamps', 'flavor', 'chunk_size', 'pyarrow_schema'}
 
 _pyarrow_write_metadata_kwargs = {'version', 'use_deprecated_int96_timestamps',
                                   'coerce_timestamps'}
@@ -1000,7 +1000,12 @@ def _write_partition_pyarrow(df, path, fs, filename, write_index,
                              partition_on, metadata_path=None, **kwargs):
     import pyarrow as pa
     from pyarrow import parquet
-    t = pa.Table.from_pandas(df, preserve_index=write_index)
+
+    if 'pyarrow_schema' in kwargs:
+        pyarrow_schema = kwargs.pop('pyarrow_schema')
+        t = pa.Table.from_pandas(df, preserve_index=write_index, schema=pyarrow_schema)
+    else:
+        t = pa.Table.from_pandas(df, preserve_index=write_index)
 
     if partition_on:
         parquet.write_to_dataset(t, path, partition_cols=partition_on,
-- 
2.20.1

I then manually create a pyarrow schema and pass it in:

df.to_parquet(
    path='out.parquet',
    engine='pyarrow',
    pyarrow_schema=pa.schema([
        ('field_1', pa.string()),
        ('field_2', pa.int64()),
    ])

@birdsarah you create the time stamp field with pa.timestamp('ns'), but if you have tz-aware data, this would need to be pa.timestamp('ns', 'UTC').

That fails when creating the empty pandas DataFrame:

In [58]: schema = pa.schema([('timestamps', pa.timestamp('us', 'UTC'))])

In [59]: pa.Table.from_pandas(pd.DataFrame(columns=schema.names),
    ...:                      schema=schema, preserve_index=False).to_pandas()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-59-3e6fd251c01b> in <module>
----> 1 pa.Table.from_pandas(pd.DataFrame(columns=schema.names), schema=schema, preserve_index=False).to_pandas()

~/miniconda3/envs/dev37/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib._PandasConvertible.to_pandas()

~/miniconda3/envs/dev37/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.Table._to_pandas()

~/miniconda3/envs/dev37/lib/python3.7/site-packages/pyarrow/pandas_compat.py in table_to_blockmanager(options, table, categories, ignore_metadata)
    600         column_indexes = pandas_metadata.get('column_indexes', [])
    601         index_descriptors = pandas_metadata['index_columns']
--> 602         table = _add_any_metadata(table, pandas_metadata)
    603         table, index = _reconstruct_index(table, index_descriptors,
    604                                           all_columns)

~/miniconda3/envs/dev37/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _add_any_metadata(table, pandas_metadata)
    927                 col = table[idx]
    928                 converted = col.to_pandas()
--> 929                 tz = col_meta['metadata']['timezone']
    930                 tz_aware_type = pa.timestamp('ns', tz=tz)
    931                 with_metadata = pa.Array.from_pandas(converted.values,

TypeError: 'NoneType' object is not subscriptable

But that can be solved by specifying ignore_metadata=True

In [60]: pa.Table.from_pandas(pd.DataFrame(columns=schema.names),
    ...:                      schema=schema, preserve_index=False).to_pandas(ignore_metadata=True)
Out[60]: 
Empty DataFrame
Columns: [timestamps]
Index: []

In [61]: _.dtypes
Out[61]: 
timestamps    datetime64[ns, UTC]
dtype: object

The reason that it originally fails is because the pandas metadata that are created when converting the empty DataFrame to an arrow Table does not match the actual schema (and in case of timezone aware data, those metadata are used, but the expected timezone is not found)

Do you know how to handle categoricals?

@birdsarah do you mean in light of manually creating the schema (as this is the workaround you are doing above)?

In arrow, Categoricals map to DictionaryType:

In [23]: cat = pd.Series(pd.Categorical(['a', 'b', 'a']))

In [24]: a = pyarrow.array(cat)

In [25]: a
Out[25]: 
<pyarrow.lib.DictionaryArray object at 0x7f0c56993ca8>

-- dictionary:
  [
    "a",
    "b"
  ]
-- indices:
  [
    0,
    1,
    0
  ]

In [26]: a.type
Out[26]: DictionaryType(dictionary<values=string, indices=int8, ordered=0>)

So you can create the type manually to add this to the schema (from the datatype of the indices, and the unique categories):

In [30]: pa.dictionary(pa.int8(), pa.array(['a', 'b']))
Out[30]: DictionaryType(dictionary<values=string, indices=int8, ordered=0>)

I think you should be able to get the categories from the dtype of the empty _meta ?

the meta is usually empty, so calling pyarrow on it directly to make a schema would produce exactly the wrong thing 😐

Indeed, when passing an empty DataFrame to arrow, it will infer all object columns as ‘null’ type.