cudf: [BUG] `repartition` failing on multiple-workers

Describe the bug I am running into issues when i try to re partition on (multiple workers>=4) even though i have enough memory , I am at just 43 % capacity.

This seems to work fine if i have just 2 workers.

Steps/Code to reproduce bug Helper Functions

import cudf
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

import numpy as np
import pandas as pd
import numpy as np
import io


n_cols = 40
#n_rows per worker 
n_rows = 4_850_000 
n_parts_per_worker = 8
# appx 33 Mill per worker

dtypes = dict(zip([str(i) for i in range(0,n_cols)],[np.int32]*(n_cols)))
df = pd.read_csv(io.StringIO(""),names=list(dtypes.keys()), dtype=dtypes)
meta_df = cudf.from_pandas(df)


#works with 2 workers but fails at 4
n_workers = 4
# Create Cluster
cluster = LocalCUDACluster(n_workers=n_workers)
client = Client(cluster)


## DataFrame Helper Function
def create_df(n_rows,n_cols):
    df=cudf.DataFrame()
    for col_id in range(0,n_cols):
        df[str(col_id)]= np.ones(shape=n_rows,dtype=np.int32)
    return df

Create dataframe

# Create Data Frame
parts = [dask.delayed(create_df)(n_rows,n_cols=n_cols) for i in range(0,n_workers*n_parts_per_worker)]
df = dask_cudf.from_delayed(parts,meta=meta_df)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
! nvidia-smi

nvidia-smi output

Wed Jul 17 11:56:55 2019       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.48                 Driver Version: 410.48                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   35C    P0    48W / 300W |   7104MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla P100-SXM2...  On   | 00000000:07:00.0 Off |                    0 |
| N/A   35C    P0    49W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla P100-SXM2...  On   | 00000000:0A:00.0 Off |                    0 |
| N/A   32C    P0    46W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   3  Tesla P100-SXM2...  On   | 00000000:0B:00.0 Off |                    0 |
| N/A   32C    P0    44W / 300W |   6757MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   4  Tesla P100-SXM2...  On   | 00000000:85:00.0 Off |                    0 |
| N/A   32C    P0    33W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   5  Tesla P100-SXM2...  On   | 00000000:86:00.0 Off |                    0 |
| N/A   31C    P0    34W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   6  Tesla P100-SXM2...  On   | 00000000:89:00.0 Off |                    0 |
| N/A   31C    P0    35W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   7  Tesla P100-SXM2...  On   | 00000000:8A:00.0 Off |                    0 |
| N/A   32C    P0    32W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     59424      C   ...naconda3/envs/rapids_nightly/bin/python   347MiB |
|    0     59468      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    1     59472      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    2     59466      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
|    3     59470      C   ...naconda3/envs/rapids_nightly/bin/python  6747MiB |
+-----------------------------------------------------------------------------+

Repartition df

## repartition Df
## Run into OOM issues at repartition
df = df.repartition(npartitions=n_workers)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))

Error Trace

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-3-219dd226ab30> in <module>
      4 df = df.persist()
      5 wait(df)
----> 6 print("len of df = {:,}".format(len(df)))

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
    510     def __len__(self):
    511         return self.reduction(
--> 512             len, np.sum, token="len", meta=int, split_every=False
    513         ).compute()
    514 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2525                     should_rejoin = False
   2526             try:
-> 2527                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2528             finally:
   2529                 for f in futures.values():

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1821                 direct=direct,
   1822                 local_worker=local_worker,
-> 1823                 asynchronous=asynchronous,
   1824             )
   1825 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    761         else:
    762             return sync(
--> 763                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    764             )
    765 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330             e.wait(10)
    331     if error[0]:
--> 332         six.reraise(*error[0])
    333     else:
    334         return result[0]

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
    740                     if exc_info is not None:
    741                         try:
--> 742                             yielded = self.gen.throw(*exc_info)  # type: ignore
    743                         finally:
    744                             # Break up a reference to itself

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1678                             exc = CancelledError(key)
   1679                         else:
-> 1680                             six.reraise(type(exception), exception, traceback)
   1681                         raise exc
   1682                     if errors == "skip":

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/methods.py in concat()
    341         func = concat_dispatch.dispatch(type(dfs[0]))
    342         return func(
--> 343             dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
    344         )
    345 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf()
     31     assert axis == 0
     32     assert join == "outer"
---> 33     return cudf.concat(dfs)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/multi.py in concat()
     52 
     53     if typ is DataFrame:
---> 54         return DataFrame._concat(objs, axis=axis, ignore_index=ignore_index)
     55     elif typ is Series:
     56         return Series._concat(objs, axis=axis)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in _concat()
   1444         data = [
   1445             (c, Series._concat([o[c] for o in objs], index=index))
-> 1446             for c in objs[0].columns
   1447         ]
   1448         out = cls(data)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in <listcomp>()
   1444         data = [
   1445             (c, Series._concat([o[c] for o in objs], index=index))
-> 1446             for c in objs[0].columns
   1447         ]
   1448         out = cls(data)

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/series.py in _concat()
   1035         else:
   1036             name = None
-> 1037         col = Column._concat([o._column for o in objs])
   1038         return cls(data=col, index=index, name=name)
   1039 

~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/column.py in _concat()
    110         # Performance the actual concatenation
    111         if newsize > 0:
--> 112             col = _column_concat(objs, col)
    113 
    114         return col

cudf/bindings/concat.pyx in cudf.bindings.concat._column_concat()

RuntimeError: CUDA error encountered at: /conda/envs/gdf/conda-bld/libcudf_1563314405241/work/cpp/src/column/legacy/column.cpp:101: 11 cudaErrorInvalidValue invalid argument

Environment overview (please complete the following information)

  • Method of cuDF install: [conda]
cudf                      0.9.0a                py37_1312    rapidsai-nightly
dask-cudf                 0.9.0a1                  py37_0    rapidsai-nightly/label/cuda10.0
libcudf                   0.9.0a            cuda10.0_1312    rapidsai-nightly
dask-cuda                 0.9.0a0+17.g3057f94.dirty          pypi_0    pypi

Edit: Added Nvidia-smi output.

Edit 2: Removed OOM from heading as this seems to be unrelated. Sorry for the confusion

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 19 (10 by maintainers)

Most upvoted comments

In your case, I’d recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.

Also, I suspect that the conversation will be able to engage more people if it doesn’t include Dask. It would be interesting to see how cudf operations react as data sizes get larger. If you can show performance degredation with small chunks with just cudf then you’ll have a lot of people who can engage on the problem. If Dask is involved then it’s probably just a couple of us (most of whom are saturated).

Our general recommendations are here: https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions . Note that in your case I’m assuming that you’re running with one thread rather than 10, as in that example.

You should have space for a few times more chunks to be in memory than you have concurrent tasks.

My understanding from @kkraus14 was that performance benefits should drop off a bit after you reach a large enough size to saturate all of the thread blocks. The estimate I was given was that this was likely to occur in the few hundred MB range. I don’t know what’s optimal though, it sounds like you have evidence against this.

Alright, I was able to reproduce this, and it was my mistake that I couldn’t before (I was only running the first two blocks of code, and missed the third one).

This is directly related to the issue in https://github.com/rapidsai/dask-cuda/issues/57. What happens here is the worker crashes due to the amount of data. Setting device_memory_limit may help by spilling it to disk, but it will make it slower.

I discussed offline with @VibhuJawa, and for such pipelines we have two options:

  1. Paying the price of working with smaller chunks; or
  2. Paying the price of spilling data to host more often.

Both will incur in overhead, and this may be very dependent on the algorithm in question, but I tend to believe that option 1 will tend to perform better.

I have been working on benchmarking alternatives for spilling memory in https://github.com/rapidsai/dask-cuda/issues/92, but the outlook isn’t great. Besides the cost of copying the memory to host, there’s also a cost into serializing that memory. For an idea of the current status, spilling to host has currently a bandwidth of about 550 MB/s, in contrast to the real bandwidth we can achieve of 6 GB/s when using unpinned memory. I expect to be able to speedup serialization, but the actual spilling bandwidth will certainly be < 6 GB/s.