cudf: [BUG] Dask distributed scheduler merges fail by adding None column
When using the distributed scheduler, merges that include a string column cause a None to be introduced in the intermediate process, causing the merges to fail.
ValueError: The columns in the computed data do not match the columns in the provided metadata
Expected: ['b0', 'b1']
Actual: ['b0', None]
This does not happen when there are only numeric columns, nor does it happen with the local distributed scheduler for Dask CPU. When running on the single machine scheduler, there are no errors.
Environment: Ubuntu 18.04, CUDA 10.1, in a rapidsai-dev-nightly container from 11/12/19
Distributed Scheduler
String column causes the error:
import cudf
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster()
client = Client(cluster)
r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)
r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)
r2['b1'] = r2.b1.astype('str')
d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)
res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-5-e523a3de6bef> in <module>
14
15 res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
---> 16 len(res)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
502 def __len__(self):
503 return self.reduction(
--> 504 len, np.sum, token="len", meta=int, split_every=False
505 ).compute()
506
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
163 dask.base.compute
164 """
--> 165 (result,) = compute(self, traverse=False, **kwargs)
166 return result
167
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
434 keys = [x.__dask_keys__() for x in collections]
435 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436 results = schedule(dsk, keys, **kwargs)
437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
438
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2543 should_rejoin = False
2544 try:
-> 2545 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2546 finally:
2547 for f in futures.values():
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1843 direct=direct,
1844 local_worker=local_worker,
-> 1845 asynchronous=asynchronous,
1846 )
1847
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
760 else:
761 return sync(
--> 762 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
763 )
764
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
331 if error[0]:
332 typ, exc, tb = error[0]
--> 333 raise exc.with_traceback(tb)
334 else:
335 return result[0]
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in f()
315 if callback_timeout is not None:
316 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317 result[0] = yield future
318 except Exception as exc:
319 error[0] = sys.exc_info()
/opt/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/opt/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1699 exc = CancelledError(key)
1700 else:
-> 1701 raise exception.with_traceback(traceback)
1702 raise exc
1703 if errors == "skip":
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/optimization.py in __call__()
974 if not len(args) == len(self.inkeys):
975 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 976 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
977
978 def __reduce__(self):
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/core.py in get()
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/core.py in _execute_task()
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/utils.py in apply()
27 def apply(func, args, kwargs=None):
28 if kwargs:
---> 29 return func(*args, **kwargs)
30 else:
31 return func(*args)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce()
4851 " the columns in the provided metadata\n"
4852 " Expected: %s\n"
-> 4853 " Actual: %s" % (str(list(meta.columns)), str(list(df.columns)))
4854 )
4855 else:
ValueError: The columns in the computed data do not match the columns in the provided metadata
Expected: ['b0', 'b1']
Actual: ['b0', None]
Works fine with just numeric
import cudf
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster()
client = Client(cluster)
r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)
r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)
d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)
res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
4
Single Machine Scheduler
import cudf
import dask_cudf
r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)
r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)
d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)
res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
4
import cudf
import dask_cudf
r1 = cudf.DataFrame()
r1['a1'] = range(4)
r1['a2'] = range(4,8)
r1['a3'] = range(4)
r2 = cudf.DataFrame()
r2['b0'] = range(4)
r2['b1'] = range(4)
r2['b1'] = r2.b1.astype('str')
d1 = dask_cudf.from_cudf(r1, 2)
d2 = dask_cudf.from_cudf(r2, 2)
res = d1.merge(d2, left_on=['a3'], right_on=['b0'])
len(res)
4
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 16 (11 by maintainers)
The dispatch thing is a regression in Dask 2.7. I have a fix for it here: https://github.com/dask/dask/pull/5590