cudf: [BUG] Dask distributed scheduler merges fail by adding None column

When using the distributed scheduler, merges that include a string column cause a None to be introduced in the intermediate process, causing the merges to fail.

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Expected: ['b0', 'b1']
  Actual:   ['b0', None]

This does not happen when there are only numeric columns, nor does it happen with the local distributed scheduler for Dask CPU. When running on the single machine scheduler, there are no errors.

Environment: Ubuntu 18.04, CUDA 10.1, in a rapidsai-dev-nightly container from 11/12/19

Distributed Scheduler

String column causes the error:

import cudf
import dask_cudf

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)

r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)
r2['b1'] = r2.b1.astype('str')

d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)

res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-5-e523a3de6bef> in <module>
     14 
     15 res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
---> 16 len(res)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
    502     def __len__(self):
    503         return self.reduction(
--> 504             len, np.sum, token="len", meta=int, split_every=False
    505         ).compute()
    506 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2543                     should_rejoin = False
   2544             try:
-> 2545                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2546             finally:
   2547                 for f in futures.values():

/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1843                 direct=direct,
   1844                 local_worker=local_worker,
-> 1845                 asynchronous=asynchronous,
   1846             )
   1847 

/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    760         else:
    761             return sync(
--> 762                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    763             )
    764 

/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    331     if error[0]:
    332         typ, exc, tb = error[0]
--> 333         raise exc.with_traceback(tb)
    334     else:
    335         return result[0]

/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

/opt/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1699                             exc = CancelledError(key)
   1700                         else:
-> 1701                             raise exception.with_traceback(traceback)
   1702                         raise exc
   1703                     if errors == "skip":

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/optimization.py in __call__()
    974         if not len(args) == len(self.inkeys):
    975             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 976         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    977 
    978     def __reduce__(self):

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/core.py in get()
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/utils.py in apply()
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce()
   4851                     " the columns in the provided metadata\n"
   4852                     "  Expected: %s\n"
-> 4853                     "  Actual:   %s" % (str(list(meta.columns)), str(list(df.columns)))
   4854                 )
   4855             else:

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Expected: ['b0', 'b1']
  Actual:   ['b0', None]

Works fine with just numeric

import cudf
import dask_cudf

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)

r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)

d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)

res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
4

Single Machine Scheduler

import cudf
import dask_cudf

r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)

r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)

d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)

res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
4
import cudf
import dask_cudf

r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)

r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)
r2['b1'] = r2.b1.astype('str')


d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)

res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
4

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 16 (11 by maintainers)

Most upvoted comments

The dispatch thing is a regression in Dask 2.7. I have a fix for it here: https://github.com/dask/dask/pull/5590