dask-cuda: Client.rebalance fails on dgx-2 with LocalCUDACluster
Client.rebalance Fails on dgx-2 with LocalCUDACluster
We currently error when we try to rebalance a dataframe (both dask-cudf and pure-dask) on local_cuda_cluster on dgx-02 .
Context:
- This works fine on
dgx-1 - This works fine with
LocalCluster
I tested it on both exp-01 and dgx-202.
Minimal Gist: https://gist.github.com/VibhuJawa/19c2cc886f6310d2b468671079014fe6
Imports
import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster
from dask.distributed import LocalCluster
# below works
# cluster = LocalCluster()
# below fails
cluster = LocalCUDACluster()
client = Client(cluster)
Write test data to disk
cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')
Rebalancing Fails on dgx-2
### read written dataset
df = dd.read_parquet('test_data/*.parquet')
# df = dask_cudf.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)
### below fails for us
client.rebalance(df)
Error Trace
distributed.utils - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
yield
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
"keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
distributed.core - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
result = await result
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
"keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-4-2b5d4120310b> in <module>
7
8 ### below fails for us
----> 9 client.rebalance(df)
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in rebalance(self, futures, workers, **kwargs)
3024 A list of workers on which to balance, defaults to all workers
3025 """
-> 3026 return self.sync(self._rebalance, futures, workers, **kwargs)
3027
3028 async def _replicate(self, futures, n=None, workers=None, branching_factor=2):
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
765 else:
766 return sync(
--> 767 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
768 )
769
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
343 if error[0]:
344 typ, exc, tb = error[0]
--> 345 raise exc.with_traceback(tb)
346 else:
347 return result[0]
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in f()
327 if callback_timeout is not None:
328 future = asyncio.wait_for(future, callback_timeout)
--> 329 result[0] = yield future
330 except Exception as exc:
331 error[0] = sys.exc_info()
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in _rebalance(self, futures, workers)
3003 await _wait(futures)
3004 keys = list({tokey(f.key) for f in self.futures_of(futures)})
-> 3005 result = await self.scheduler.rebalance(keys=keys, workers=workers)
3006 assert result["status"] == "OK"
3007
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
755 name, comm.name = comm.name, "ConnectionPool." + key
756 try:
--> 757 result = await send_recv(comm=comm, op=key, **kwargs)
758 finally:
759 self.pool.reuse(self.addr, comm)
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
554 if comm.deserialize:
555 typ, exc, tb = clean_exception(**response)
--> 556 raise exc.with_traceback(tb)
557 else:
558 raise Exception(response["text"])
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in handle_comm()
410 result = asyncio.ensure_future(result)
411 self._ongoing_coroutines.add(result)
--> 412 result = await result
413 except (CommClosedError, CancelledError) as e:
414 if self.status == "running":
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py in rebalance()
3056 return {
3057 "status": "missing-data",
-> 3058 "keys": sum([r["keys"] for r in result if "keys" in r], []),
3059 }
3060
TypeError: can only concatenate list (not "dict") to list
Environment Details:
Environment Created Using:
conda create -n cudf_13_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7
Conda Environment:
dask 2.10.1 py_0 conda-forge
dask-core 2.10.1 py_0 conda-forge
dask-cuda 0.13.0b200213 py37_12 rapidsai-nightly
dask-cudf 0.13.0a200213 py37_1710 rapidsai-nightly
About this issue
- Original URL
- State: open
- Created 4 years ago
- Comments: 34 (32 by maintainers)
FWIW that change is in Distributed 2.15.0+
I tried the same thing with on my machine and got the same error. Environment:
conda create -n cudf_14_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7Interestingly changing number of partitions to another number (like 15)
dask_cudf.from_cudf(cdf, npartitions=15).to_parquet('test_data')resulted in no error.