distributed: distributed.comm.core.CommClosedError: in : Stream is closed

I have 2 machines: a worker machine and a scheduler machine.

The worker machine is centos 7 with python3.7 and dask-distributed 2.5.2.

The scheduler machine has a docker container running. The docker container has the same version of python and dask, and incidentally, it is also a centos 7 image.

I start the scheduler docker container with this docker-compose yaml:

    version: '3.7'
    services:
      service1:
        image: ...
        container_name: ...
        network_mode: bridge
        env_file:
          - ~/.env
        ports:
          - "8888:8888"
          - "9796:9796"
          - "9797:9797"
        volumes:
          ...
        command: jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root

(Notice I’m mapping the two ports needed for a scheduler to run.)

When I start up the a dask-worker on the worker box and a dask-scheduler in the docker container. everything seems to initiate correctly except after a little bit I get this error:

[root@510b0c5af190 web]# my_project.py run distributed_workflow
Traceback (most recent call last):
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 237, in write
    stream.write(b)
  File "/conda/lib/python3.7/site-packages/tornado/iostream.py", line 546, in write
    self._check_closed()
  File "/conda/lib/python3.7/site-packages/tornado/iostream.py", line 1009, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/conda/bin/sql_server", line 11, in <module>
    load_entry_point('sql-server', 'console_scripts', 'sql_server')()
  File "/conda/lib/python3.7/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/conda/lib/python3.7/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/app/sql_server/cli/sql_server.py", line 28, in run
    daily(True)
  File "/app/sql_server/cli/run/daily.py", line 166, in daily
    verbose=True,
  File "/wcf/spartans/spartans/spartans.py", line 116, in __enter__
    self.open()
  File "/wcf/spartans/spartans/spartans.py", line 123, in open
    self._start_agents()
  File "/wcf/spartans/spartans/spartans.py", line 179, in _start_agents
    set_as_default=False)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 715, in __init__
    self.start(timeout=timeout)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 880, in start
    sync(self.loop, self._start, **kwargs)
  File "/conda/lib/python3.7/site-packages/distributed/utils.py", line 333, in sync
    raise exc.with_traceback(tb)
  File "/conda/lib/python3.7/site-packages/distributed/utils.py", line 317, in f
    result[0] = yield future
  File "/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 973, in _start
    await self._ensure_connected(timeout=timeout)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 1040, in _ensure_connected
    {"op": "register-client", "client": self.id, "reply": False}
  File "/conda/lib/python3.7/site-packages/tornado/gen.py", line 748, in run
    yielded = self.gen.send(value)
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 254, in write
    convert_stream_closed_error(self, e)
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 132, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

So I investigate the logs. The worker log looks like this:

(base) [worker@worker-03 tmp]$ cat worker_output.txt
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.1.25.3:43111'
distributed.diskutils - INFO - Found stale lock file and directory '/home/worker/worker-h8jhtnng', purging
distributed.worker - INFO -       Start worker at:      tcp://10.1.25.3:42739
distributed.worker - INFO -          Listening to:      tcp://10.1.25.3:42739
distributed.worker - INFO -          dashboard at:            10.1.25.3:40970
distributed.worker - INFO - Waiting to connect to:   tcp://scheduler.myco.com:9796
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                   33.53 GB
distributed.worker - INFO -       Local Directory: /home/worker/worker-mf3marrd
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://scheduler.myco.com:9796
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

and my scheduler log (inside my docker container, on the scheduler.myco.com machine) looks like this:

[root@510b0c5af190 web]# cat /tmp/worker_output.txt 
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-lq7oa5sc
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://172.17.0.2:9796
distributed.scheduler - INFO -   dashboard at:                     :9797
distributed.scheduler - INFO - Register tcp://10.1.25.3:42739
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.25.3:42739
distributed.core - INFO - Starting established connection

Now, there are no errors in the logs. Indeed, I inspect the running processes and I see this:

Worker machine

worker  8974  1.6  0.1 394088 45044 ?        Sl   16:04   0:14 /data/anaconda3/bin/python /data/anaconda3/bin/dask-worker scheduler.myco.com:9796 --dashboard-prefix my_workflow

Scheduler container:

root        33  1.4  1.4 670884 115056 pts/0   Sl   16:04   0:15 /conda/bin/python /conda/bin/dask-scheduler --port 9796 --dashboard-address 172.17.0.2:9797 --dashboard-prefix my_workflow

Notice the 172.17.0.2 address is the address inside the scheduler container. If I try to initiate the dask-address as the hostname of the host machine instead I get this error [Errno 99] Cannot assign requested address presumably because the port 9797 is already taken by the docker container.

Anyway. These processes are still running, yet to my knowledge, they’re not working on the workflow I tried to pass to the worker. Can you please help me understand what I’m doing wrong to produce the error above?

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Comments: 24 (9 by maintainers)

Most upvoted comments

I too am facing this issue and it won’t go away with dask/distributed 2.20

Hi,

I ran into the same issue with a graph I created using Dask Futures. It turns out that it doesn’t affect the result in my case and I highly suspect that in my case it’s when a task is writing a file (.csv in my case) and it takes longer than a predefined timeout (I checked in the dask params and you indeed have comm.timeouts.connect set to 10s but not convince it’s this one as I tried to set it to 30s and it continued crashing). Anyway my guess is that during a long task the worker can’t communicate then the scheduler see it as dead breaks the connection. I found a workaround by changing the number of retries in comm.retry.count - called in distributed.utils_comm - to 2. No more connection issues. Again I am sure it depends on your code / how long it runs / a task runs

Hope it helps.

python 3.6.9 dask 2.9.1 distributed 2.9.1 tornado 6.0.3

PS: maybe that has been fixed by a more recent version of distributed, but my company still runs w 2.9.1

Any news on this topic? I’m still having this problem with Dask 2023.3.2 and Distributed 2023.3.2.

I created a reproducible example. It isn’t minimal but fairly compact. Based on the stacktrace the error seems to have a different code path than what previous people have reported.

The first run is usually successful but executing it a second time immediately after it leads to the error. I have no dask config files.

I cannot reproduce it with pure=False and I can also not reproduce it when setting n_workers=2 with no adapt().

import random
import string
from pathlib import Path

import dask
from dask.distributed import Client, LocalCluster


def dummy(*args, **kwargs):
    import time

    time.sleep(10)
    return True


def test():
    delayed_function = dask.delayed(dummy, pure=True)
    targets = [
        delayed_function(delayed_function),
        delayed_function(delayed_function),
        delayed_function(delayed_function, delayed_function),
    ]

    random_suffix = "".join(random.choices(string.ascii_letters + string.digits, k=10))
    with LocalCluster(
        local_directory=Path(".") / f"dask_{random_suffix}",
        threads_per_worker=1,
        processes=True,
        n_workers=1,
    ) as cluster:
        cluster.adapt(minimum=1, maximum=2)
        with Client(cluster) as client:
            print(dask.compute(*targets, scheduler=client))


if __name__ == "__main__":
    test()

Typical output

$ python test_tcp.py
(True, True, True)
Task was destroyed but it is pending!
task: <Task pending name='Task-152' coro=<AdaptiveCore.adapt() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7c6f4a75b0>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py:688]>


$ python test_tcp.py
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f475e2d68e0>>, <Task finished name='Task-65' coro=<Worker.close() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/worker.py:1169> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/worker.py", line 1193, in close
    await r.close_gracefully()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/core.py", line 858, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/core.py", line 641, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
    convert_stream_closed_error(self, e)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
(True, True, True)
Task was destroyed but it is pending!
task: <Task pending name='Task-152' coro=<AdaptiveCore.adapt() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0c8548f070>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py:688]>

edit: python 3.8.6 dask 2020.12.0 distributed 2021.01.1 tornado 6.1

Hi,

I ran into the same issue with a graph I created using Dask Futures. It turns out that it doesn’t affect the result in my case and I highly suspect that in my case it’s when a task is writing a file (.csv in my case) and it takes longer than a predefined timeout (I checked in the dask params and you indeed have comm.timeouts.connect set to 10s but not convince it’s this one as I tried to set it to 30s and it continued crashing). Anyway my guess is that during a long task the worker can’t communicate then the scheduler see it as dead breaks the connection. I found a workaround by changing the number of retries in comm.retry.count - called in distributed.utils_comm - to 2. No more connection issues. Again I am sure it depends on your code / how long it runs / a task runs

Hope it helps.

python 3.6.9 dask 2.9.1 distributed 2.9.1 tornado 6.0.3

PS: maybe that has been fixed by a more recent version of distributed, but my company still runs w 2.9.1

That solves my problem. After setting both the distributed.comm.retry.count and distributed.comm.timeouts.connect to a larger value, the error disappears.

dask.config.set({"distributed.comm.retry.count": 10})
dask.config.set({"distributed.comm.timeouts.connect": 30})

In my case, it seems that it is the (SSD) disk that is having really high response time when out of cache, slowing down the worker, making longer response time for working, and causing the network problem.

I’m trying to calculate a huge distance matrix, about 5000*5000, which makes frequent writing to the disk for storing out-of-memeory data. If I switch to a smaller matrix calculation, which require less or no disk writing, the error disppears. So the above guess is really reasonable for me.

Update:

The error still shows up with the larger distance matrix with 100000*100000. But this time, this error is caused by another reason, which is the termination of worker by nanny as the worker memory exceeds 95%.

After setting the dask to not terminate worker, the error disappears.

dask.config.set({"distributed.worker.memory.terminate": False})

Ref: https://stackoverflow.com/questions/57997463/dask-warning-worker-exceeded-95-memory-budget

Another vote for addressing this issue. As others have said, it’s inconsistent and difficult to reproduce, but it’s definitely there. We are currently messing with setting the number of workers and memory allocation for each worker as that has fixed similar issues in the past.

dask==2.9.2
distributed==2.6.0

I’ve done a bit of experimentation and it looks like it is fixed by setting memory limit per worker and the number of workers to something other than their default values. I have absolutely no idea why this fixes it.

I’m having a similar issues outside containers:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x1096bbd10>>, <Task finished coro=<Worker.heartbeat() done, defined at /Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py:881> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py", line 918, in heartbeat
    raise e
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py", line 891, in heartbeat
    metrics=await self.get_metrics(),
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 123, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

i’m running a test set with pytest and it happens “randomly”. Was not able to reproduce running a single test. And my tests are actually reported as Passed…

OSX, py3.7, dask 2.10.1, distributed 2.10.0