distributed: Timed out trying to connect ... : connect() didn't finish in time

Whereas the following is not a bug we can pinpoint, it really bothers us. After weeks of experimentation and investigation we didn’t find a real solution.

We run something like

(a_bag
  .repartition(100_000) # outputs small parttitions
  .map(very_cpu_heavy_computation) # outputs ~4MB partitions
  .repartition(1000).to_dataframe().to_parquet() # writes ~30MB parquet files from ~400MB in-memory-partitions
)

on 100 workers with 1500 cpus on Kubernetes pods.

When it comes to the last re-partition step, dask starts blocking the event loop and the workers start spamming the following logs

distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/2: Timed out trying to connect to 'tcp://10.2.3.20:38449' after 20 s: Timed out trying to connect to 'tcp://10.2.3.20:38449' after 20 s: connect() didn't finish in time

=> 5000 events

distributed.core - INFO - Event loop was unresponsive in Worker for 4.95s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

=> 26,094 events

We are working around that, by setting

# less connections to avoid potential network saturation and connection timeouts
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING=25
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING=5
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG=16384

# graceful timeout and retry policy
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=20
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=20
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=2

sothat, the job finally finishes, despite the issues in the communication.

Anyways, sometimes workers even start hanging and have to be restarted manually. Also the progress significantly slows down to the end of the job.

I’m sharing this with the community and hoping, that somebody may give pointers what to try, or even ideas for a resolution.

Environment:

  • Dask version: 2.23.0
  • Python version: 3.6
  • Operating System: ubuntu on kubernetes
  • Install method (conda, pip, source): pip

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 1
  • Comments: 36 (26 by maintainers)

Most upvoted comments

What helped quite a bit is, sofar:

export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=20
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=5
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=10

Long application timeout effectively makes the workers saturate the scheduler socket. Short timeout + backoff protects the scheduler listener by effectively spreading the connections in time. Failed tasks (still occurs) would be rescheduled instead of failing the whole job. https://github.com/dask/distributed/blob/b2f594e69751e0733ac595026e04181c55119ae9/distributed/comm/core.py#L327-L332

This traceback looks “ordinary” and this might just be an ordinary timeout. Especially if the cluster is under heavy load this might just be expected behaviour. There is a discussion ongoing to increase the default timeout value to something larger, see #4228 You might want to try out to increase the value and see if this resolves the problem (try 30s or 60s if your cluster is under extreme stress)

Apologies for the tone of of my last message; I flinch a little when I reread what I wrote.

Don’t worry, I didn’t even notice anything. I was just a bit upset that the issue was not resolved 😃

hat 2.30.1 has indeed helped fix our issues.

Glad to hear!

This might be fixed by #4167, can you try that branch and see if it works for you?

I would be curious what this returns on your system. On my laptop it’s 15ms.

import asyncio
import time
from typing import List

from dask.distributed import Client, get_worker
from dask.utils import format_time


async def ping(addresses: List[str]):
    await asyncio.gather(*[
        get_worker().rpc(addr).identity()
        for addr in addresses
        if addr != get_worker().address
    ])


def all_ping(client: Client):
    workers = list(client.scheduler_info()["workers"])
    start = time.time()
    client.run(ping, workers)
    stop = time.time()
    print(format_time(stop - start))


if __name__ == "__main__":
    with Client() as client:
        all_ping(client)

Also, due to connection pooling, running this before your comptuation might make this smoother short term.