dask-cuda: Out of Memory Sort Fails even with Spill over
Out of Memory Sort still seems to be failing even with device spill PR merged. (https://github.com/rapidsai/dask-cuda/pull/51) .
The memory still seems to linearly grow which causes RuntimeError: parallel_for failed: out of memory.
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf
# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB')
client = Client(cluster)
# # print client info
print(client)
# Code to simulate_data
def generate_file(output_file,rows=100):
with open(output_file, 'wb') as f:
f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n23,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
f.close()
# generate the test file
output_file='test.csv'
# Uncomment below
generate_file(output_file,rows=100_000_000)
# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(df.head(10).to_pandas())
# reading it using dask_cudf
df = df.sort_values(['A','B','C'])
Error Trace :
--------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-1-62d876400539> in <module>
30
31 # reading it using dask_cudf
---> 32 df = df.sort_values(['A','B','C'])
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/core.py in sort_values(self, by, ignore_index)
440 """
441 parts = self.to_delayed()
--> 442 sorted_parts = batcher_sortnet.sort_delayed_frame(parts, by)
443 return from_delayed(sorted_parts, meta=self._meta).reset_index(
444 force=not ignore_index
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/batcher_sortnet.py in sort_delayed_frame(parts, by)
133 list(map(delayed(lambda x: int(x is not None)), parts[:valid]))
134 )
--> 135 valid = compute(valid_ct)[0]
136 validparts = parts[:valid]
137 return validparts
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2566 should_rejoin = False
2567 try:
-> 2568 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2569 finally:
2570 for f in futures.values():
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1820 direct=direct,
1821 local_worker=local_worker,
-> 1822 asynchronous=asynchronous,
1823 )
1824
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
751 return future
752 else:
--> 753 return sync(self.loop, func, *args, **kwargs)
754
755 def __repr__(self):
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
329 e.wait(10)
330 if error[0]:
--> 331 six.reraise(*error[0])
332 else:
333 return result[0]
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/utils.py in f()
314 if timeout is not None:
315 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316 result[0] = yield future
317 except Exception as exc:
318 error[0] = sys.exc_info()
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
727
728 try:
--> 729 value = future.result()
730 except Exception:
731 exc_info = sys.exc_info()
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
734 if exc_info is not None:
735 try:
--> 736 yielded = self.gen.throw(*exc_info) # type: ignore
737 finally:
738 # Break up a reference to itself
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1651 six.reraise(CancelledError, CancelledError(key), None)
1652 else:
-> 1653 six.reraise(type(exception), exception, traceback)
1654 if errors == "skip":
1655 bad_keys.add(key)
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask/compatibility.py in apply()
91 def apply(func, args, kwargs=None):
92 if kwargs:
---> 93 return func(*args, **kwargs)
94 else:
95 return func(*args)
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/batcher_sortnet.py in _compare_frame()
72 if a is not None and b is not None:
73 joint = gd.concat([a, b])
---> 74 sorten = joint.sort_values(by=by)
75 # Split the sorted frame using the *max_part_size*
76 lhs, rhs = sorten[:max_part_size], sorten[max_part_size:]
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/dataframe.py in sort_values()
1279 return self._sort_by(self[by].argsort(
1280 ascending=ascending,
-> 1281 na_position=na_position)
1282 )
1283
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/dataframe.py in _sort_by()
1225 # Perform out = data[index] for all columns
1226 for k in self.columns:
-> 1227 df[k] = self[k].take(sorted_indices.to_gpu_array())
1228 df.index = self.index.take(sorted_indices.to_gpu_array())
1229 return df
~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/series.py in take()
324 return self[indices]
325
--> 326 col = cpp_copying.apply_gather_array(self.data.to_gpu_array(), indices)
327
328 if self._column.mask:
cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather_array()
cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather_column()
cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather()
RuntimeError: parallel_for failed: out of memory
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 38 (30 by maintainers)
@pentschev , Yup, with all the TPCxbb work this has indeed improved a lot. This is good to close in my book too.
@jakirkham @kkraus14 and I were discussing this offline, so to set expectations straight:
“To share device memory pointers and events across processes, an application must use the Inter Process Communication API, which is described in detail in the reference manual. The IPC API is only supported for 64-bit processes on Linux and for devices of compute capability 2.0 and higher. Note that the IPC API is not supported for cudaMallocManaged allocations.”
Reference: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#interprocess-communication
@jakirkham yes, the RMM documentation has details on that: https://github.com/rapidsai/rmm#cuda-managed-memory
Note that with this PR from @shwina, it’s even easier to set the allocator mode: https://github.com/rapidsai/cudf/pull/2682
It should be as easy as just doing:
What I found out that memory is really the issue, in the case described here, the GPU has 16GB of memory. Trying that example with
device_memory_limit='10000 MiB'fails, and just before failing the real GPU memory utilization was at 16GB, despite the LRU being under 10GB. Reducing todevice_memory_limit='5000 MiB'completes successfully, but takes 120 minutes. Raising back todevice_memory_limit='10000 MiB'but reducing fromchunksize='4096 MiB'tochunksize='1024 MiB'also finishes here, taking 71 minutes.So what’s happening is that cuDF will allocate more memory that is proportional to the
chunksize, which makes sense. As of now, I don’t know if there’s a better/safer way of keeping track of the entire device memory (including that managed by cuDF), so I don’t see another working solution for now other than having smaller chunks.On side channels @VibhuJawa pointed out that the chunk sizes have a non-negligible impact on performance, so this is definitely something we want to improve in the future, but for the time being, using smaller chunk sizes is the only solution here.