dask: Unable to write parquet with "empty" categorical column with pyarrow engine
I encountered an issue when working on a concat involving multiples files. It seems that the issue happens also when applied on only one file with a partition containing a categorical column with no values.
Maybe this issue is linked : Issue 6243 If I am not mistaken, it worked with Pyarrow 0.17.1
import dask.dataframe as dd
import pandas as pd
names = [ 'name']
address = names
df1 = dd.from_pandas(pd.DataFrame({'name':names}), npartitions=1)
df2 = dd.from_pandas(pd.DataFrame({'address':address}), npartitions=1)
df1['name'] = df1['name'].astype('category')
df = dd.concat([df1, df2])
del df['address']
df.to_parquet('does_not_work')
The error that is raised :
RuntimeError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
33 try:
---> 34 metadata.append_row_groups(md)
35 except RuntimeError as err:
/opt/conda/lib/python3.8/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.FileMetaData.append_row_groups()
RuntimeError: AppendRowGroups requires equal schemas.
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-46-4faae0e6b3fa> in <module>
12 del df['address']
13
---> 14 df.to_parquet('does_not_work')
/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
3948 from .io import to_parquet
3949
-> 3950 return to_parquet(self, path, *args, **kwargs)
3951
3952 @derived_from(pd.DataFrame)
/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
506 if compute_kwargs is None:
507 compute_kwargs = dict()
--> 508 out = out.compute(**compute_kwargs)
509 return out
510
/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169
/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
445 postcomputes.append(x.__dask_postcompute__())
446
--> 447 results = schedule(dsk, keys, **kwargs)
448 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
449
/opt/conda/lib/python3.8/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 pools[thread][num_workers] = pool
75
---> 76 results = get_async(
77 pool.apply_async,
78 len(pool._pool),
/opt/conda/lib/python3.8/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
/opt/conda/lib/python3.8/site-packages/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
/opt/conda/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
/opt/conda/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
/opt/conda/lib/python3.8/site-packages/dask/utils.py in apply(func, args, kwargs)
27 def apply(func, args, kwargs=None):
28 if kwargs:
---> 29 return func(*args, **kwargs)
30 else:
31 return func(*args)
/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in write_metadata(parts, fmd, fs, path, append, **kwargs)
1034 i_start = 1
1035 for i in range(i_start, len(parts)):
-> 1036 _append_row_groups(_meta, parts[i][0]["meta"])
1037 with fs.open(metadata_path, "wb") as fil:
1038 _meta.write_metadata_file(fil)
/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
35 except RuntimeError as err:
36 if "requires equal schemas" in str(err):
---> 37 raise RuntimeError(
38 "Schemas are inconsistent, try using "
39 '`to_parquet(..., schema="infer")`, or pass an explicit '
RuntimeError: Schemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema.
Environment:
- Dask version: 2.24.0
- Pyarrow version: 1.0.1
- Python version: 3.8
- Operating System: Dask-Dev image
- Install method (conda, pip, source) : Dask-Dev image
About this issue
- Original URL
- State: open
- Created 4 years ago
- Comments: 23 (17 by maintainers)
@rjzamora in case you have thoughts on whether we can do something better from the dask side.
Ok phew. I was starting to get paranoid 😃