dask: Unable to write parquet with "empty" categorical column with pyarrow engine

I encountered an issue when working on a concat involving multiples files. It seems that the issue happens also when applied on only one file with a partition containing a categorical column with no values.

Maybe this issue is linked : Issue 6243 If I am not mistaken, it worked with Pyarrow 0.17.1

import dask.dataframe as dd
import pandas as pd

names = [ 'name']
address = names

df1 = dd.from_pandas(pd.DataFrame({'name':names}), npartitions=1)
df2 = dd.from_pandas(pd.DataFrame({'address':address}), npartitions=1)
df1['name'] = df1['name'].astype('category')

df = dd.concat([df1, df2])
del df['address']

df.to_parquet('does_not_work')

The error that is raised :

RuntimeError                              Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
     33     try:
---> 34         metadata.append_row_groups(md)
     35     except RuntimeError as err:

/opt/conda/lib/python3.8/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.FileMetaData.append_row_groups()

RuntimeError: AppendRowGroups requires equal schemas.

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-46-4faae0e6b3fa> in <module>
     12 del df['address']
     13 
---> 14 df.to_parquet('does_not_work')

/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   3948         from .io import to_parquet
   3949 
-> 3950         return to_parquet(self, path, *args, **kwargs)
   3951 
   3952     @derived_from(pd.DataFrame)

/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
    506         if compute_kwargs is None:
    507             compute_kwargs = dict()
--> 508         out = out.compute(**compute_kwargs)
    509     return out
    510 

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    445         postcomputes.append(x.__dask_postcompute__())
    446 
--> 447     results = schedule(dsk, keys, **kwargs)
    448     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    449 

/opt/conda/lib/python3.8/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     74                 pools[thread][num_workers] = pool
     75 
---> 76     results = get_async(
     77         pool.apply_async,
     78         len(pool._pool),

/opt/conda/lib/python3.8/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

/opt/conda/lib/python3.8/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

/opt/conda/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/opt/conda/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/lib/python3.8/site-packages/dask/utils.py in apply(func, args, kwargs)
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in write_metadata(parts, fmd, fs, path, append, **kwargs)
   1034                 i_start = 1
   1035             for i in range(i_start, len(parts)):
-> 1036                 _append_row_groups(_meta, parts[i][0]["meta"])
   1037             with fs.open(metadata_path, "wb") as fil:
   1038                 _meta.write_metadata_file(fil)

/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
     35     except RuntimeError as err:
     36         if "requires equal schemas" in str(err):
---> 37             raise RuntimeError(
     38                 "Schemas are inconsistent, try using "
     39                 '`to_parquet(..., schema="infer")`, or pass an explicit '

RuntimeError: Schemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema.

Environment:

  • Dask version: 2.24.0
  • Pyarrow version: 1.0.1
  • Python version: 3.8
  • Operating System: Dask-Dev image
  • Install method (conda, pip, source) : Dask-Dev image

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Comments: 23 (17 by maintainers)

Most upvoted comments

@rjzamora in case you have thoughts on whether we can do something better from the dask side.

With engine=“pyarrow” it still fails with master versions of both.

Ok phew. I was starting to get paranoid 😃