distributed: Computation deadlocks after inter worker communication error

Before I start, sincere apologies for not having a reproducible example but the issue is a bit weird and hard to reproduce. I mostly querying here to see if anybody has an idea what might be going on.

What happened:

We have seen multiple times now that some individual computations are stuck in the processing state and are not being processed. A closer investigation revealed in all occurrences that the worker the processing task is on, let’s call it WorkerA, is in the process of fetching the tasks dependencies from WorkerB. While fetching these, an exception seems to be triggered on WorkerB and for some reason this exception is lost. In particular, WorkerA never gets any signal that something was wrong and it waits indefinitely for the dependency to arrive while WorkerB already forgot about the error. This scenario ultimately leads to a deadlock where the single dependency fetch is blocking the cluster. It looks like the cluster never actually self heals. The last time I could observe this issue, the computation was blocked for about 18hrs. After manually closing all connections, the workers retried and the graph finished successfully.

I’ve seen this deadlock happen multiple times already but only during the last instance, I could glimps an error log connecting to issue #4130 where the tornado stream seems to cause this error. Irregardless of what caused the error, I believe there is an issue on distributed side since the error was not properly propagated, the connections where not closed, etc. suggesting to WorkerA that everything was fine.

Traceback
Traceback (most recent call last):

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/core.py", line 513, in handle_comm
    result = await result

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/worker.py", line 1284, in get_data
    compressed = await comm.write(msg, serializers=serializers)

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/distributed/comm/tcp.py", line 243, in write
    future = stream.write(frame)

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/tornado/iostream.py", line 553, in write
    self._write_buffer.append(data)

  File "/mnt/mesos/sandbox/venv/lib/python3.6/site-packages/tornado/iostream.py", line 177, in append
    b += data  # type: ignore

BufferError: Existing exports of data: object cannot be re-sized

The above traceback shows that the error appeared while WorkerB was handling the request, executing the get_data handler, ultimately raising the exception https://github.com/dask/distributed/blob/34fb932855df7fe2fb18e1076ede2dd0eb3d95d0/distributed/core.py#L523-L535

In this code section we can see that the exception is caught and logged. Following the code, an exception error result is generated which should be sent back to WorkerA causing the dependency fetch to fail, causing a retry

https://github.com/dask/distributed/blob/34fb932855df7fe2fb18e1076ede2dd0eb3d95d0/distributed/core.py#L544-L554

Even if this fails (debug logs not enabled), the connection should be closed/aborted eventually and removed from the servers tracking

https://github.com/dask/distributed/blob/34fb932855df7fe2fb18e1076ede2dd0eb3d95d0/distributed/core.py#L561-L569

which never happens. I checked on the still running workers and the self._comm attributed was still populated with the seemingly broken connection.

Why was this finally block never executed? Why was the reply never submitted received by WorkerA?

Just looking at the code, I would assume this to be already implemented robustly but it seems I’m missing something. Anybody has a clue about what might be going on?

Environment

  • Distributed version: 2.20.0
  • Python version: Py 3.6
  • Operating System: Debian
  • Tornado 6.0.4
  • Install method (conda, pip, source): pip

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (15 by maintainers)

Commits related to this issue

Most upvoted comments

l. 546 in core.py (as you cited it) runs just fine without an exception

If it was running just fine, it would trigger an exception on the listener side, wouldn’t it?

I would not expect this: The BufferError means this is a short write. So I guess the other end just waits for the rest (which never comes, or only garbage, intermangled with the exception sent here).

If the write would somehow block / not terminate that would explain the issue.

My guess is this read blocks on the other end somewhere around here:

https://github.com/dask/distributed/blob/7d769b8d87197211f6bd7c91335f9712fc4da949/distributed/comm/tcp.py#L193-L198

Do you have a suggestion to why we do not see any exception logs (see line 534 core.py)?

I thought this was the traceback you posted?

if the serialized exception exceeds 2048 bytes which would not re-trigger this same BufferError

Not really important here but why would the size of the exception impact this? I was still under the assumption that this buffer error was not, yet, well understood

tornado behaves differently depending on the size passed to write. The code path that potentially triggers the BufferError is only hit for writes smaller than 2048 bytes (while exceptions sent by distributed can, by default, be up to 10kb):

https://github.com/tornadoweb/tornado/blob/b4e39e52cd27d6b3b324a399dff046f71545c4a5/tornado/iostream.py#L164-L177