dask-cuda: Out of Memory Sort Fails even with Spill over

Out of Memory Sort still seems to be failing even with device spill PR merged. (https://github.com/rapidsai/dask-cuda/pull/51) .

The memory still seems to linearly grow which causes RuntimeError: parallel_for failed: out of memory.

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf

# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB')
client = Client(cluster)
# # print client info
print(client)

# Code to simulate_data

def generate_file(output_file,rows=100):
    with open(output_file, 'wb') as f:
        f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
        f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n23,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
        f.close()

# generate the test file 
output_file='test.csv'
# Uncomment below
generate_file(output_file,rows=100_000_000)

# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(df.head(10).to_pandas())


# reading it using dask_cudf
df = df.sort_values(['A','B','C'])

Error Trace :

--------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-1-62d876400539> in <module>
     30 
     31 # reading it using dask_cudf
---> 32 df = df.sort_values(['A','B','C'])

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/core.py in sort_values(self, by, ignore_index)
    440         """
    441         parts = self.to_delayed()
--> 442         sorted_parts = batcher_sortnet.sort_delayed_frame(parts, by)
    443         return from_delayed(sorted_parts, meta=self._meta).reset_index(
    444             force=not ignore_index

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/batcher_sortnet.py in sort_delayed_frame(parts, by)
    133         list(map(delayed(lambda x: int(x is not None)), parts[:valid]))
    134     )
--> 135     valid = compute(valid_ct)[0]
    136     validparts = parts[:valid]
    137     return validparts

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2566                     should_rejoin = False
   2567             try:
-> 2568                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2569             finally:
   2570                 for f in futures.values():

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1820                 direct=direct,
   1821                 local_worker=local_worker,
-> 1822                 asynchronous=asynchronous,
   1823             )
   1824 

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    751             return future
    752         else:
--> 753             return sync(self.loop, func, *args, **kwargs)
    754 
    755     def __repr__(self):

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    329             e.wait(10)
    330     if error[0]:
--> 331         six.reraise(*error[0])
    332     else:
    333         return result[0]

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/utils.py in f()
    314             if timeout is not None:
    315                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316             result[0] = yield future
    317         except Exception as exc:
    318             error[0] = sys.exc_info()

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1651                             six.reraise(CancelledError, CancelledError(key), None)
   1652                         else:
-> 1653                             six.reraise(type(exception), exception, traceback)
   1654                     if errors == "skip":
   1655                         bad_keys.add(key)

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask/compatibility.py in apply()
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/dask_cudf/batcher_sortnet.py in _compare_frame()
     72     if a is not None and b is not None:
     73         joint = gd.concat([a, b])
---> 74         sorten = joint.sort_values(by=by)
     75         # Split the sorted frame using the *max_part_size*
     76         lhs, rhs = sorten[:max_part_size], sorten[max_part_size:]

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/dataframe.py in sort_values()
   1279         return self._sort_by(self[by].argsort(
   1280             ascending=ascending,
-> 1281             na_position=na_position)
   1282         )
   1283 

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/dataframe.py in _sort_by()
   1225         # Perform out = data[index] for all columns
   1226         for k in self.columns:
-> 1227             df[k] = self[k].take(sorted_indices.to_gpu_array())
   1228         df.index = self.index.take(sorted_indices.to_gpu_array())
   1229         return df

~/anaconda3/envs/py_36_rapids/lib/python3.6/site-packages/cudf/dataframe/series.py in take()
    324             return self[indices]
    325 
--> 326         col = cpp_copying.apply_gather_array(self.data.to_gpu_array(), indices)
    327 
    328         if self._column.mask:

cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather_array()

cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather_column()

cudf/bindings/copying.pyx in cudf.bindings.copying.apply_gather()

RuntimeError: parallel_for failed: out of memory

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 38 (30 by maintainers)

Most upvoted comments

With TPCx-BB efforts being largely successful, I’d say this has been fixed or improved substantially, is that correct @VibhuJawa ? Are we good closing this or should we keep it open?

@pentschev , Yup, with all the TPCxbb work this has indeed improved a lot. This is good to close in my book too.

@jakirkham @kkraus14 and I were discussing this offline, so to set expectations straight:

“To share device memory pointers and events across processes, an application must use the Inter Process Communication API, which is described in detail in the reference manual. The IPC API is only supported for 64-bit processes on Linux and for devices of compute capability 2.0 and higher. Note that the IPC API is not supported for cudaMallocManaged allocations.”

Reference: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#interprocess-communication

@jrhemstad, does managed here mean cudaMallocManaged?

@jakirkham yes, the RMM documentation has details on that: https://github.com/rapidsai/rmm#cuda-managed-memory

Note that with this PR from @shwina, it’s even easier to set the allocator mode: https://github.com/rapidsai/cudf/pull/2682

It should be as easy as just doing:

// non-pool, managed memory
set_allocator(allocator="managed")

// managed memory pool
set_allocator(allocator="managed", pool=True)

What I found out that memory is really the issue, in the case described here, the GPU has 16GB of memory. Trying that example with device_memory_limit='10000 MiB' fails, and just before failing the real GPU memory utilization was at 16GB, despite the LRU being under 10GB. Reducing to device_memory_limit='5000 MiB' completes successfully, but takes 120 minutes. Raising back to device_memory_limit='10000 MiB' but reducing from chunksize='4096 MiB' to chunksize='1024 MiB' also finishes here, taking 71 minutes.

So what’s happening is that cuDF will allocate more memory that is proportional to the chunksize, which makes sense. As of now, I don’t know if there’s a better/safer way of keeping track of the entire device memory (including that managed by cuDF), so I don’t see another working solution for now other than having smaller chunks.

On side channels @VibhuJawa pointed out that the chunk sizes have a non-negligible impact on performance, so this is definitely something we want to improve in the future, but for the time being, using smaller chunk sizes is the only solution here.