dask: Inconsistent Schema with partitioned pyarrow files
When writing partitoned Datasets to parquet files, and it so happens that there are is a partition where one column has no values, inconsistent schemas are written to the parquet file so that reading is prevented later:
import dask as da
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
import shutil
PATH_DASK = '/tmp/dask.pa'
if os.path.exists(PATH_DASK):
shutil.rmtree(PATH_DASK)
arrays = np.array([np.array([0, 1, 2]), np.array([3, 4]), np.nan, np.nan])
strings = np.array(['a', 'b', np.nan, np.nan])
df = pd.DataFrame([0, 0, 1, 1], columns=['partition_column'])
df['arrays'] = pd.Series(arrays)
df['strings'] = pd.Series(strings)
da_df = dd.from_pandas(df, npartitions=2)
da_df.to_parquet(PATH_DASK, partition_on=['partition_column'])
dd.read_parquet(PATH_DASK)
Fails with:
ValueError: Schema in partition[partition_column=1] /tmp/dask.pa/partition_column=1/4cd3552267d84d66a5ec8764f869a13b.parquet was different.
arrays: null
strings: string
__index_level_0__: int64
metadata
--------
...
vs
arrays: list<item: int64>
child 0, item: int64
strings: string
__index_level_0__: int64
metadata
--------
I found #3454 relating to https://issues.apache.org/jira/browse/ARROW-2860. However the Pyarrow Issue only relates to reading files with different schemas. However Pyarrow in it self doesn’t even suffer this Bug, as Pyarrow itself would’t even write those different schemas in the first place:
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import os
import shutil
PATH_PYARROW = '/tmp/pyarrow.pa'
if os.path.exists(PATH_PYARROW):
shutil.rmtree(PATH_PYARROW)
arrays = np.array([np.array([0, 1, 2]), np.array([3, 4]), np.nan, np.nan])
strings = np.array([np.nan, np.nan, 'a', 'b'])
df = pd.DataFrame([0, 0, 1, 1], columns=['partition_column'])
df['arrays'] = pd.Series(arrays)
df['strings'] = pd.Series(strings)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=PATH_PYARROW, partition_cols=['partition_column'])
dataset = pq.ParquetDataset(PATH_PYARROW)
df_read = dataset.read().to_pandas()
df_read
Works perfectly fine. Even reading the Pyarrow Files with Dask now works perfectly fine:
dd.read_parquet(PATH_PYARROW).compute()
This is simply due to the reason that Pyarrow still uses the same schema accross partitions even if ther is missing data:
import os
files = [val for sublist in [[os.path.join(i[0], j) for j in i[2]] for i in os.walk(PATH_PYARROW)] for val in sublist]
for file in files:
dataset_part = pq.ParquetDataset(file)
print(file)
print(dataset_part.schema)
print(dataset_part.read().to_pandas())
Outputs:
/tmp/pyarrow.pa/partition_column=0/f32bf0bfedcc4312829e7c4371e544e6.parquet
<pyarrow._parquet.ParquetSchema object at 0x7fdd1c103eb8>
arrays.list.item: INT64
strings: BYTE_ARRAY UTF8
__index_level_0__: INT64
arrays strings
0 [0, 1, 2] nan
1 [3, 4] nan
/tmp/pyarrow.pa/partition_column=1/044dd125e0b5472db23bff9a954a1bff.parquet
<pyarrow._parquet.ParquetSchema object at 0x7fdd1c1034e0>
arrays.list.item: INT64
strings: BYTE_ARRAY UTF8
__index_level_0__: INT64
arrays strings
2 None a
3 None b
So summing it up: In Pyarrow the pyarrow.parquet.write_to_dataset wrapper around pyarrow.parquet.write_table takes care that the schema in individual files doesn’t get screwed up. Dask blindly uses pyarrow.parquet.write_table on each partition and hence ends up with a wrong schema.
This means that even if https://issues.apache.org/jira/browse/ARROW-2860 is fixed, loading of these wrong schemas might work, but they are still written wrongly to the file in the first place.
I believe that fixing this in dask is probably going to be hard but might still be worth a try. Maybe one simple but not ideal solution would be to to stick to the schema the first written partition produces and raise an error otherwise. At least we should make sure, that after all partitions are written to file, the schemas match. Otherwise people will save their files without an error raised and notice later on that opening is no longer possible.
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 48 (43 by maintainers)
I’ve just run into the same problem. I understand that I can use pyarrow.parquet.write_to_dataset as a workaround, right? But I would like to do it properly using dask API. Is someone working in this issue? Can I help?
I was having this problem too and the following, not very elegant, patch fixes it for my data:
I then manually create a pyarrow schema and pass it in:
@birdsarah you create the time stamp field with
pa.timestamp('ns'), but if you have tz-aware data, this would need to bepa.timestamp('ns', 'UTC').That fails when creating the empty pandas DataFrame:
But that can be solved by specifying
ignore_metadata=TrueThe reason that it originally fails is because the pandas metadata that are created when converting the empty DataFrame to an arrow Table does not match the actual schema (and in case of timezone aware data, those metadata are used, but the expected timezone is not found)
@birdsarah do you mean in light of manually creating the schema (as this is the workaround you are doing above)?
In arrow, Categoricals map to DictionaryType:
So you can create the type manually to add this to the schema (from the datatype of the indices, and the unique categories):
I think you should be able to get the categories from the dtype of the empty
_meta?Indeed, when passing an empty DataFrame to arrow, it will infer all object columns as ‘null’ type.