cudf: [BUG] `repartition` failing on multiple-workers
Describe the bug
I am running into issues when i try to re partition on (multiple workers>=4) even though i have enough memory , I am at just 43 % capacity.
This seems to work fine if i have just 2 workers.
Steps/Code to reproduce bug Helper Functions
import cudf
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import numpy as np
import pandas as pd
import numpy as np
import io
n_cols = 40
#n_rows per worker
n_rows = 4_850_000
n_parts_per_worker = 8
# appx 33 Mill per worker
dtypes = dict(zip([str(i) for i in range(0,n_cols)],[np.int32]*(n_cols)))
df = pd.read_csv(io.StringIO(""),names=list(dtypes.keys()), dtype=dtypes)
meta_df = cudf.from_pandas(df)
#works with 2 workers but fails at 4
n_workers = 4
# Create Cluster
cluster = LocalCUDACluster(n_workers=n_workers)
client = Client(cluster)
## DataFrame Helper Function
def create_df(n_rows,n_cols):
df=cudf.DataFrame()
for col_id in range(0,n_cols):
df[str(col_id)]= np.ones(shape=n_rows,dtype=np.int32)
return df
Create dataframe
# Create Data Frame
parts = [dask.delayed(create_df)(n_rows,n_cols=n_cols) for i in range(0,n_workers*n_parts_per_worker)]
df = dask_cudf.from_delayed(parts,meta=meta_df)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
! nvidia-smi
nvidia-smi output
Wed Jul 17 11:56:55 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.48 Driver Version: 410.48 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla P100-SXM2... On | 00000000:06:00.0 Off | 0 |
| N/A 35C P0 48W / 300W | 7104MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla P100-SXM2... On | 00000000:07:00.0 Off | 0 |
| N/A 35C P0 49W / 300W | 6757MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla P100-SXM2... On | 00000000:0A:00.0 Off | 0 |
| N/A 32C P0 46W / 300W | 6757MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla P100-SXM2... On | 00000000:0B:00.0 Off | 0 |
| N/A 32C P0 44W / 300W | 6757MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 4 Tesla P100-SXM2... On | 00000000:85:00.0 Off | 0 |
| N/A 32C P0 33W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 5 Tesla P100-SXM2... On | 00000000:86:00.0 Off | 0 |
| N/A 31C P0 34W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 6 Tesla P100-SXM2... On | 00000000:89:00.0 Off | 0 |
| N/A 31C P0 35W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 7 Tesla P100-SXM2... On | 00000000:8A:00.0 Off | 0 |
| N/A 32C P0 32W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 59424 C ...naconda3/envs/rapids_nightly/bin/python 347MiB |
| 0 59468 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
| 1 59472 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
| 2 59466 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
| 3 59470 C ...naconda3/envs/rapids_nightly/bin/python 6747MiB |
+-----------------------------------------------------------------------------+
Repartition df
## repartition Df
## Run into OOM issues at repartition
df = df.repartition(npartitions=n_workers)
df = df.persist()
wait(df)
print("len of df = {:,}".format(len(df)))
Error Trace
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-3-219dd226ab30> in <module>
4 df = df.persist()
5 wait(df)
----> 6 print("len of df = {:,}".format(len(df)))
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
510 def __len__(self):
511 return self.reduction(
--> 512 len, np.sum, token="len", meta=int, split_every=False
513 ).compute()
514
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
173 dask.base.compute
174 """
--> 175 (result,) = compute(self, traverse=False, **kwargs)
176 return result
177
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2525 should_rejoin = False
2526 try:
-> 2527 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2528 finally:
2529 for f in futures.values():
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1821 direct=direct,
1822 local_worker=local_worker,
-> 1823 asynchronous=asynchronous,
1824 )
1825
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
761 else:
762 return sync(
--> 763 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
764 )
765
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
330 e.wait(10)
331 if error[0]:
--> 332 six.reraise(*error[0])
333 else:
334 return result[0]
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/utils.py in f()
315 if callback_timeout is not None:
316 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317 result[0] = yield future
318 except Exception as exc:
319 error[0] = sys.exc_info()
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/tornado/gen.py in run(self)
740 if exc_info is not None:
741 try:
--> 742 yielded = self.gen.throw(*exc_info) # type: ignore
743 finally:
744 # Break up a reference to itself
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1678 exc = CancelledError(key)
1679 else:
-> 1680 six.reraise(type(exception), exception, traceback)
1681 raise exc
1682 if errors == "skip":
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask/dataframe/methods.py in concat()
341 func = concat_dispatch.dispatch(type(dfs[0]))
342 return func(
--> 343 dfs, axis=axis, join=join, uniform=uniform, filter_warning=filter_warning
344 )
345
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/dask_cudf/backends.py in concat_cudf()
31 assert axis == 0
32 assert join == "outer"
---> 33 return cudf.concat(dfs)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/multi.py in concat()
52
53 if typ is DataFrame:
---> 54 return DataFrame._concat(objs, axis=axis, ignore_index=ignore_index)
55 elif typ is Series:
56 return Series._concat(objs, axis=axis)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in _concat()
1444 data = [
1445 (c, Series._concat([o[c] for o in objs], index=index))
-> 1446 for c in objs[0].columns
1447 ]
1448 out = cls(data)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/dataframe.py in <listcomp>()
1444 data = [
1445 (c, Series._concat([o[c] for o in objs], index=index))
-> 1446 for c in objs[0].columns
1447 ]
1448 out = cls(data)
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/series.py in _concat()
1035 else:
1036 name = None
-> 1037 col = Column._concat([o._column for o in objs])
1038 return cls(data=col, index=index, name=name)
1039
~/anaconda3/envs/rapids_nightly/lib/python3.7/site-packages/cudf/dataframe/column.py in _concat()
110 # Performance the actual concatenation
111 if newsize > 0:
--> 112 col = _column_concat(objs, col)
113
114 return col
cudf/bindings/concat.pyx in cudf.bindings.concat._column_concat()
RuntimeError: CUDA error encountered at: /conda/envs/gdf/conda-bld/libcudf_1563314405241/work/cpp/src/column/legacy/column.cpp:101: 11 cudaErrorInvalidValue invalid argument
Environment overview (please complete the following information)
- Method of cuDF install: [conda]
cudf 0.9.0a py37_1312 rapidsai-nightly
dask-cudf 0.9.0a1 py37_0 rapidsai-nightly/label/cuda10.0
libcudf 0.9.0a cuda10.0_1312 rapidsai-nightly
dask-cuda 0.9.0a0+17.g3057f94.dirty pypi_0 pypi
Edit: Added Nvidia-smi output.
Edit 2: Removed OOM from heading as this seems to be unrelated.
Sorry for the confusion
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 19 (10 by maintainers)
In your case, I’d recommend chunks in the gigabyte range, unless there is some very pressing need to go larger.
Also, I suspect that the conversation will be able to engage more people if it doesn’t include Dask. It would be interesting to see how cudf operations react as data sizes get larger. If you can show performance degredation with small chunks with just cudf then you’ll have a lot of people who can engage on the problem. If Dask is involved then it’s probably just a couple of us (most of whom are saturated).
Our general recommendations are here: https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions . Note that in your case I’m assuming that you’re running with one thread rather than 10, as in that example.
You should have space for a few times more chunks to be in memory than you have concurrent tasks.
My understanding from @kkraus14 was that performance benefits should drop off a bit after you reach a large enough size to saturate all of the thread blocks. The estimate I was given was that this was likely to occur in the few hundred MB range. I don’t know what’s optimal though, it sounds like you have evidence against this.
Alright, I was able to reproduce this, and it was my mistake that I couldn’t before (I was only running the first two blocks of code, and missed the third one).
This is directly related to the issue in https://github.com/rapidsai/dask-cuda/issues/57. What happens here is the worker crashes due to the amount of data. Setting
device_memory_limitmay help by spilling it to disk, but it will make it slower.I discussed offline with @VibhuJawa, and for such pipelines we have two options:
Both will incur in overhead, and this may be very dependent on the algorithm in question, but I tend to believe that option 1 will tend to perform better.
I have been working on benchmarking alternatives for spilling memory in https://github.com/rapidsai/dask-cuda/issues/92, but the outlook isn’t great. Besides the cost of copying the memory to host, there’s also a cost into serializing that memory. For an idea of the current status, spilling to host has currently a bandwidth of about 550 MB/s, in contrast to the real bandwidth we can achieve of 6 GB/s when using unpinned memory. I expect to be able to speedup serialization, but the actual spilling bandwidth will certainly be < 6 GB/s.