ucx-py: [BUG] Serialization fails when tuple of large CuPy arrays returned from remote Dask function

I have been getting the following exception in the _fit() task for cuML’s distributed Naive Bayes when the number of features grows over about 5.5M:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
    deserializers=deserializers,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.utils - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/ucx.py", line 192, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
    res = await offload(_from_frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
    deserializers=deserializers,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
  File "dev-bb-query-28.py", line 396, in <module>
    result_df, acc, prec, cmat = main(client)
  File "../../tools/utils.py", line 229, in profiled
    result = func(*args, **kwargs)
  File "dev-bb-query-28.py", line 314, in main
    model.fit(X_train, y_train)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/cuml/dask/naive_bayes/naive_bayes.py", line 197, in fit
    counts1 = self.client_.compute(counts, sync=True)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 2800, in compute
    result = self.gather(futures)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1893, in gather
    asynchronous=asynchronous,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1781, in _gather
    response = await future
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1832, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/ucx.py", line 192, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
    res = await offload(_from_frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
    deserializers=deserializers,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
^[[A^[[A^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 200, in ignoring
    yield
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/spec.py", line 607, in close_clusters
    cluster.close(timeout=10)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 81, in close
    return self.sync(self._close, callback_timeout=timeout)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 160, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
    e.wait(10)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/threading.py", line 300, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 201, in ignoring
    except exceptions as e:
TypeError: catching classes that do not inherit from BaseException is not allowed

For some reason, it appears the CuPy deserializer is receiving a frame of type bytes, rather than an instance of __cuda_array_interface__. I modified the deserializer which is noted in the stack trace to print out the header of the offending message.

Here’s the modified deserializer function:

@cuda_deserialize.register(cupy.ndarray)
def cuda_deserialize_cupy_ndarray(header, frames):
    (frame,) = frames
    print("Deserializing Frame: %s" % str(type(frame)))
    if not isinstance(frame, cupy.ndarray):
        if isinstance(frame, bytes):
            print("Header from bytes from: %s" % str(header))
        frame = PatchedCudaArrayInterface(frame)
    arr = cupy.ndarray(
        shape=header["shape"],
        dtype=header["typestr"],
        memptr=cupy.asarray(frame).data,
        strides=header["strides"],
    )
    return arr

And here’s the message header being passed to that function when the CuPy array (number of naive bayes features) grows too large:

{'shape': (3, 8388608), 'typestr': '<f4', 'descr': (('', '<f4'),), 'version': 2, 'strides': (4, 12), 'data': (139841484357632, False), 'type-serialized': b'\x80\x04\x95\x1e\x00\x00\x00\x00\x00\x00\x00\x8c\x0ecupy.core.core\x94\x8c\x07ndarray\x94\x93\x94.', 'serializer': 'cuda', 'compression': (None,)}

This only happens when using protocol=UCX in Dask and it is not reproducible when protocol=TCP. What’s also very strange is that it only happens when I try to compute the resulting futures of _fit() directly, which will end up trying to pass a tuple containing two CuPy arrays.

As a short-term workaround, I’ve modified the Naive Bayes code to fetch each array individually, and this seems to work without error. This is not ideal, however, as the Naive Bayes code becomes more complex and 2 new extra tasks need to be scheduled to fetch the results of an upstream task that should be able to be returned directly.

I’ve filed this issue in ucx-py, rather than Dask, because this issue only seems to be affected when UCX is used.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 23 (23 by maintainers)

Most upvoted comments

I was able to reproduce with the following little script.

import cupy as cp
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask
from cuml.dask.common import to_sp_dask_array
from cuml.dask.naive_bayes import MultinomialNB
cluster = LocalCUDACluster(protocol="ucx")
client = Client(cluster)
twenty_train = fetch_20newsgroups(subset='train',
                                  shuffle=True, random_state=42)
hv = HashingVectorizer(n_features=8000000)
xformed = hv.fit_transform(twenty_train.data)
X = to_sp_dask_array(xformed, client)
y = dask.array.from_array(twenty_train.target, asarray=False,
                          fancy=False).astype(cp.int32)
model = MultinomialNB()
model.fit(X, y)

Specifically, the issue happened when I increased the number of features of the input (hv = HashingVectorizer(n_features=8000000)) So far I’ve been finding that I start to see the issue after about 5.5M.

It’s important that I point out the current Naive Bayes implementation in cuML (branch-0.13 at the time of writing this) implements a workaround so in order to reproduce this issue, it takes a very small change to the Dask Naive Bayes. I modified the script in place in conda: ~/miniconda3/envs/cuml_dev/lib/python3.7/site-packages/cuml/dask/naive_bayes/naive_bayes.py

I added the following line below this one:

        counts = [self.client_.submit(
            MultinomialNB._fit,
            p,
            classes,
            self.kwargs,
            workers=[w]
        ) for w, p in worker_parts.items()]
        counts1 = self.client_.compute(counts, sync=True)

For now, this is the most accurate reproduction of the behavior I’m seeing end to end. I’m looking forward to isolating and fixing the root cause so that I can remove the workaround from cuML.