ucx-py: [BUG] Serialization fails when tuple of large CuPy arrays returned from remote Dask function
I have been getting the following exception in the _fit() task for cuML’s distributed Naive Bayes when the number of features grows over about 5.5M:
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
deserializers=deserializers,
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
return loads(header, frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
frame = PatchedCudaArrayInterface(frame)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.utils - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
yield
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/ucx.py", line 192, in read
frames, deserialize=self.deserialize, deserializers=deserializers
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
res = await offload(_from_frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
deserializers=deserializers,
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
return loads(header, frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
frame = PatchedCudaArrayInterface(frame)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
File "dev-bb-query-28.py", line 396, in <module>
result_df, acc, prec, cmat = main(client)
File "../../tools/utils.py", line 229, in profiled
result = func(*args, **kwargs)
File "dev-bb-query-28.py", line 314, in main
model.fit(X_train, y_train)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/cuml/dask/naive_bayes/naive_bayes.py", line 197, in fit
counts1 = self.client_.compute(counts, sync=True)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 2800, in compute
result = self.gather(futures)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1893, in gather
asynchronous=asynchronous,
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 780, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
raise exc.with_traceback(tb)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
result[0] = yield future
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1781, in _gather
response = await future
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1832, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
operation=operation,
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
return await coro()
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/ucx.py", line 192, in read
frames, deserialize=self.deserialize, deserializers=deserializers
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
res = await offload(_from_frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
deserializers=deserializers,
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
return loads(header, frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
return loads(header, frames)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
frame = PatchedCudaArrayInterface(frame)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
^[[A^[[A^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 200, in ignoring
yield
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/spec.py", line 607, in close_clusters
cluster.close(timeout=10)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 81, in close
return self.sync(self._close, callback_timeout=timeout)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 160, in sync
return sync(self.loop, func, *args, **kwargs)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
e.wait(10)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/threading.py", line 552, in wait
signaled = self._cond.wait(timeout)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/threading.py", line 300, in wait
gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 201, in ignoring
except exceptions as e:
TypeError: catching classes that do not inherit from BaseException is not allowed
For some reason, it appears the CuPy deserializer is receiving a frame of type bytes, rather than an instance of __cuda_array_interface__. I modified the deserializer which is noted in the stack trace to print out the header of the offending message.
Here’s the modified deserializer function:
@cuda_deserialize.register(cupy.ndarray)
def cuda_deserialize_cupy_ndarray(header, frames):
(frame,) = frames
print("Deserializing Frame: %s" % str(type(frame)))
if not isinstance(frame, cupy.ndarray):
if isinstance(frame, bytes):
print("Header from bytes from: %s" % str(header))
frame = PatchedCudaArrayInterface(frame)
arr = cupy.ndarray(
shape=header["shape"],
dtype=header["typestr"],
memptr=cupy.asarray(frame).data,
strides=header["strides"],
)
return arr
And here’s the message header being passed to that function when the CuPy array (number of naive bayes features) grows too large:
{'shape': (3, 8388608), 'typestr': '<f4', 'descr': (('', '<f4'),), 'version': 2, 'strides': (4, 12), 'data': (139841484357632, False), 'type-serialized': b'\x80\x04\x95\x1e\x00\x00\x00\x00\x00\x00\x00\x8c\x0ecupy.core.core\x94\x8c\x07ndarray\x94\x93\x94.', 'serializer': 'cuda', 'compression': (None,)}
This only happens when using protocol=UCX in Dask and it is not reproducible when protocol=TCP. What’s also very strange is that it only happens when I try to compute the resulting futures of _fit() directly, which will end up trying to pass a tuple containing two CuPy arrays.
As a short-term workaround, I’ve modified the Naive Bayes code to fetch each array individually, and this seems to work without error. This is not ideal, however, as the Naive Bayes code becomes more complex and 2 new extra tasks need to be scheduled to fetch the results of an upstream task that should be able to be returned directly.
I’ve filed this issue in ucx-py, rather than Dask, because this issue only seems to be affected when UCX is used.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 23 (23 by maintainers)
I was able to reproduce with the following little script.
Specifically, the issue happened when I increased the number of features of the input (
hv = HashingVectorizer(n_features=8000000)) So far I’ve been finding that I start to see the issue after about 5.5M.It’s important that I point out the current Naive Bayes implementation in cuML (branch-0.13 at the time of writing this) implements a workaround so in order to reproduce this issue, it takes a very small change to the Dask Naive Bayes. I modified the script in place in conda:
~/miniconda3/envs/cuml_dev/lib/python3.7/site-packages/cuml/dask/naive_bayes/naive_bayes.pyI added the following line below this one:
For now, this is the most accurate reproduction of the behavior I’m seeing end to end. I’m looking forward to isolating and fixing the root cause so that I can remove the workaround from cuML.