dask-cuda: Client.rebalance fails on dgx-2 with LocalCUDACluster

Client.rebalance Fails on dgx-2 with LocalCUDACluster

We currently error when we try to rebalance a dataframe (both dask-cudf and pure-dask) on local_cuda_cluster on dgx-02 .

Context:

  • This works fine on dgx-1
  • This works fine with LocalCluster

I tested it on both exp-01 and dgx-202.

Minimal Gist: https://gist.github.com/VibhuJawa/19c2cc886f6310d2b468671079014fe6

Imports

import cudf
import dask_cudf
from dask import delayed

from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster

from dask.distributed import LocalCluster
# below works
# cluster = LocalCluster()

# below fails
cluster = LocalCUDACluster()
client = Client(cluster)

Write test data to disk

cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

Rebalancing Fails on dgx-2

### read written dataset
df = dd.read_parquet('test_data/*.parquet')
# df = dask_cudf.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)

### below fails for us
client.rebalance(df)

Error Trace


distributed.utils - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
    "keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
distributed.core - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
    "keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-2b5d4120310b> in <module>
      7 
      8 ### below fails for us
----> 9 client.rebalance(df)

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in rebalance(self, futures, workers, **kwargs)
   3024             A list of workers on which to balance, defaults to all workers
   3025         """
-> 3026         return self.sync(self._rebalance, futures, workers, **kwargs)
   3027 
   3028     async def _replicate(self, futures, n=None, workers=None, branching_factor=2):

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    765         else:
    766             return sync(
--> 767                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    768             )
    769 

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    343     if error[0]:
    344         typ, exc, tb = error[0]
--> 345         raise exc.with_traceback(tb)
    346     else:
    347         return result[0]

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in f()
    327             if callback_timeout is not None:
    328                 future = asyncio.wait_for(future, callback_timeout)
--> 329             result[0] = yield future
    330         except Exception as exc:
    331             error[0] = sys.exc_info()

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in _rebalance(self, futures, workers)
   3003         await _wait(futures)
   3004         keys = list({tokey(f.key) for f in self.futures_of(futures)})
-> 3005         result = await self.scheduler.rebalance(keys=keys, workers=workers)
   3006         assert result["status"] == "OK"
   3007 

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    755             name, comm.name = comm.name, "ConnectionPool." + key
    756             try:
--> 757                 result = await send_recv(comm=comm, op=key, **kwargs)
    758             finally:
    759                 self.pool.reuse(self.addr, comm)

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    554         if comm.deserialize:
    555             typ, exc, tb = clean_exception(**response)
--> 556             raise exc.with_traceback(tb)
    557         else:
    558             raise Exception(response["text"])

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in handle_comm()
    410                             result = asyncio.ensure_future(result)
    411                             self._ongoing_coroutines.add(result)
--> 412                             result = await result
    413                     except (CommClosedError, CancelledError) as e:
    414                         if self.status == "running":

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py in rebalance()
   3056                     return {
   3057                         "status": "missing-data",
-> 3058                         "keys": sum([r["keys"] for r in result if "keys" in r], []),
   3059                     }
   3060 

TypeError: can only concatenate list (not "dict") to list

Environment Details:

Environment Created Using:

conda create -n cudf_13_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7

Conda Environment:

dask                      2.10.1                     py_0    conda-forge
dask-core                 2.10.1                     py_0    conda-forge
dask-cuda                 0.13.0b200213           py37_12    rapidsai-nightly
dask-cudf                 0.13.0a200213         py37_1710    rapidsai-nightly

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Comments: 34 (32 by maintainers)

Most upvoted comments

FWIW that change is in Distributed 2.15.0+

So even with a fresh clean environment (without dask state files), i still get the error.

I ran it on exp-01 .

On todays nightly: https://gist.github.com/VibhuJawa/20e4eb194ef95a7c17dbc01c850028ed On yesterdays nightly: https://gist.github.com/VibhuJawa/10d850874bf3f96e7adf62dff7251efb .

Running below script:

import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster

cluster = LocalCUDACluster(local_directory="/tmp")
client = Client(cluster)

cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

df = dd.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)

import distributed
client.rebalance(futures=distributed.futures_of(df))

The bug might be non deterministic , and may be happening more on exp-01 than on dgx-202 , i will run the script a couple of times on both and report results once i get a dgx-2 free again.

I tried the same thing with on my machine and got the same error. Environment: conda create -n cudf_14_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7

Interestingly changing number of partitions to another number (like 15) dask_cudf.from_cudf(cdf, npartitions=15).to_parquet('test_data') resulted in no error.