dask: [bug] dask.dataframe.DataFrame.merge fails for inner join

Setup

from pandas import DataFrame
from dask.delayed import delayed
from dask.dataframe import from_delayed

A = from_delayed([
  delayed(DataFrame)({'x': range(i, i+5), 'a': range(i, i+5)})
  for i in range(0, 10, 5)
])

B = from_delayed([
  delayed(DataFrame)({'x': range(i, i+5), 'b': range(i, i+5)})
  for i in range(0, 10, 5)
])

C = A.merge(
  B, on=('x',), how='inner'
)

Traceback

>>> C.compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/lib/python3.6/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/base.py", line 398, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/threaded.py", line 76, in get
    pack_exception=pack_exception, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/local.py", line 462, in get_async
    raise_exception(exc, tb)
  File "/conda/lib/python3.6/site-packages/dask/compatibility.py", line 112, in reraise
    raise exc
  File "/conda/lib/python3.6/site-packages/dask/local.py", line 230, in execute_task
    result = _execute_task(task, data)
  File "/conda/lib/python3.6/site-packages/dask/core.py", line 119, in _execute_task
    return func(*args2)
  File "/conda/lib/python3.6/site-packages/dask/optimization.py", line 942, in __call__
    dict(zip(self.inkeys, args)))
  File "/conda/lib/python3.6/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/conda/lib/python3.6/site-packages/dask/core.py", line 119, in _execute_task
    return func(*args2)
  File "/conda/lib/python3.6/site-packages/dask/compatibility.py", line 93, in apply
    return func(*args, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/dataframe/core.py", line 3794, in apply_and_enforce
    df = func(*args, **kwargs)
  File "/conda/lib/python3.6/site-packages/dask/dataframe/shuffle.py", line 417, in partitioning_index
    return hash_pandas_object(df, index=False) % int(npartitions)
  File "/conda/lib/python3.6/site-packages/pandas/core/util/hashing.py", line 117, in hash_pandas_object
    h = Series(h, index=obj.index, dtype='uint64', copy=False)
  File "/conda/lib/python3.6/site-packages/pandas/core/series.py", line 262, in __init__
    .format(val=len(data), ind=len(index)))
ValueError: Length of passed values is 0, index implies 5

Version

>>> pandas.__version__
'0.23.4'
>>> dask.__version__
'1.1.4'

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

@TomAugspurger Yes. It may be related to this issue.

In my case, the inner join results are more like a outer join. I am getting additional unmatched rows when joining multiple dataframes on a distributed systems. I tried with/without shuffle ~ with respect to task level(shuffle=‘task’) during merging. No luck though 😦

Also tried to

  • convert back to pandas df and reset the index
  • tried to merge the results in pandas, the results are not the same as expected for inner join

Is there any workaround for this ?

Kindly provide your thoughts on this!