Setup
from pandas import DataFrame
from dask.delayed import delayed
from dask.dataframe import from_delayed
A = from_delayed([
delayed(DataFrame)({'x': range(i, i+5), 'a': range(i, i+5)})
for i in range(0, 10, 5)
])
B = from_delayed([
delayed(DataFrame)({'x': range(i, i+5), 'b': range(i, i+5)})
for i in range(0, 10, 5)
])
C = A.merge(
B, on=('x',), how='inner'
)
Traceback
>>> C.compute()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/conda/lib/python3.6/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/conda/lib/python3.6/site-packages/dask/base.py", line 398, in compute
results = schedule(dsk, keys, **kwargs)
File "/conda/lib/python3.6/site-packages/dask/threaded.py", line 76, in get
pack_exception=pack_exception, **kwargs)
File "/conda/lib/python3.6/site-packages/dask/local.py", line 462, in get_async
raise_exception(exc, tb)
File "/conda/lib/python3.6/site-packages/dask/compatibility.py", line 112, in reraise
raise exc
File "/conda/lib/python3.6/site-packages/dask/local.py", line 230, in execute_task
result = _execute_task(task, data)
File "/conda/lib/python3.6/site-packages/dask/core.py", line 119, in _execute_task
return func(*args2)
File "/conda/lib/python3.6/site-packages/dask/optimization.py", line 942, in __call__
dict(zip(self.inkeys, args)))
File "/conda/lib/python3.6/site-packages/dask/core.py", line 149, in get
result = _execute_task(task, cache)
File "/conda/lib/python3.6/site-packages/dask/core.py", line 119, in _execute_task
return func(*args2)
File "/conda/lib/python3.6/site-packages/dask/compatibility.py", line 93, in apply
return func(*args, **kwargs)
File "/conda/lib/python3.6/site-packages/dask/dataframe/core.py", line 3794, in apply_and_enforce
df = func(*args, **kwargs)
File "/conda/lib/python3.6/site-packages/dask/dataframe/shuffle.py", line 417, in partitioning_index
return hash_pandas_object(df, index=False) % int(npartitions)
File "/conda/lib/python3.6/site-packages/pandas/core/util/hashing.py", line 117, in hash_pandas_object
h = Series(h, index=obj.index, dtype='uint64', copy=False)
File "/conda/lib/python3.6/site-packages/pandas/core/series.py", line 262, in __init__
.format(val=len(data), ind=len(index)))
ValueError: Length of passed values is 0, index implies 5
Version
>>> pandas.__version__
'0.23.4'
>>> dask.__version__
'1.1.4'
@TomAugspurger Yes. It may be related to this issue.
In my case, the inner join results are more like a outer join. I am getting additional unmatched rows when joining multiple dataframes on a distributed systems. I tried with/without shuffle ~ with respect to task level(shuffle=‘task’) during merging. No luck though 😦
Also tried to
Is there any workaround for this ?
Kindly provide your thoughts on this!