channels_redis: Update to v4 results in "RuntimeError: Event loop is closed"

After upgrading to channels-redis==4.0.0, our celery tasks are all reporting the following traceback:

future: <Task finished name='Task-9' coro=<Connection.disconnect() done, defined at /usr/local/lib/python3.9/site-packages/red                                                                                                                                                                is/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.9/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 751, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[2022-10-12 07:30:31,972] [ERROR] [asyncio] Task exception was never retrieved

Downgrading the image to channels-redis==3.4.1 resolves the issue, so I’m starting out here. This seems probably related to #312.

Image OS is Debian Bullseye, amd64. The django application is running with gunicorn.

Probably related packages:

celery==5.2.7
channels==3.0.5
channels-redis==4.0.0
hiredis==2.0.0
redis==4.3.4

Full Pipfile.lock: https://github.com/paperless-ngx/paperless-ngx/blob/dev/Pipfile.lock

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 29
  • Comments: 35 (15 by maintainers)

Commits related to this issue

Most upvoted comments

For anyone else with this issue, it doesn’t exhibit when using the newer RedisPubSubChannelLayer. It’s not in the Channels docs because it’s still in beta.

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
        "CONFIG": {
            "hosts": ["rediss://***:***@***.com:6379"],
        },
    },
}

The issue here is not calling channel_layer.close_pools().

See https://github.com/django/channels/issues/1966#issuecomment-1400903327

I’ve tracked it down. The move to redis-py requires closing connections explicitly

Utilizing asyncio Redis requires an explicit disconnect of the connection …

So to get the shell example to pass cleanly we need to do something like this:

>>> import channels.layers
>>> from asgiref.sync import async_to_sync
>>>
>>> channel_layer = channels.layers.get_channel_layer()
>>>
>>> async def closing_send(channel_layer, channel, message):
...     await channel_layer.send(channel, message)
...     await channel_layer.close_pools()
>>>
>>> async_to_sync(closing_send)(channel_layer,'test_channel', {'type': 'hello'})
>>> async_to_sync(channel_layer.receive)('test_channel')
{'type': 'hello'}

Even then we should do the same for the receive call really. (A further call would hit the same issue.)

If you’re using aync_to_sync() you’ll need to wrap calls in a function that ensures the close_pools() call is made at the end, since the event loop is shutdown when the concurrent context ends.

This should be mentioned in the docs until an official fix is released.

Edit. Until this is resolved I wouldn’t consider 4.0 a stable release

Same here and sometimes

Task exception was never retrieved
future: <Task finished name='Task-378' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-381' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-382' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop

@carltongibson this a major breaking issue so I’m just trying to bring it more attention.

Tomorrow I’ll go through the source and try make a contribution.

If I use ahaltindis’s workaround detailed here, my code works. So it definitely seems to be a conflict/issue with async_to_sync.

loop = asyncio.get_event_loop()
coroutine = channel_layer.group_send(
    group_name,
    {
        'type': 'task.message',
        'text': context
    })
loop.run_until_complete(coroutine)

Perhaps channels, or channels_redis documentation could be improved, to provide an example of how users should trigger a group_send from a synchronous context (such that several calls can be made)? Perhaps a ‘sync’ version of group_send could be made available?

Can we stop with the “me too” and “any update” comments please.

If you have significant new information to add then please do. (Great!) Otherwise it’s just noise.

I’m planning new releases over the new year, and looking into this is part of that.

@carltongibson You can also reproduce the error using this repo:

https://github.com/realsuayip/zaida/tree/490e0c5a49a750bc56a63f9cba5c9514ed91eee4

Steps to reproduce:

1 - Clone the repo 2 - Run “python3 docker.py up” 3 - Once all the containers are running, run “python3 docker.py test”

Hope it helps.

as a workaround Downgrades channels-redis to 3.4.1,

this work for me

Thanks for quick reply. So if I understood correctly, to send a message through the channel outsider a consumer I should do something like:

async def closing_send(channel, message):
     channel_layer = channels.layers.get_channel_layer()
     await channel_layer.send(channel, message)
     await channel_layer.close_pools()
async_to_sync(closing_send)('test-channel", {"type": "hello"})

but the connections returned from get_channel_layer are cached (in channels.layers.channel_layers object) so if there are other threads using the connection (like websockets) they will be also disconnected.

I tested this and it seems to work fine:

async def closing_send(channel, message):
     channel_layer = channels.layers.channel_layers.make_backed(DEFAULT_CHANNEL_LAYER)
     await channel_layer.send(channel, message)
     await channel_layer.close_pools()
async_to_sync(closing_send)('test-channel", {"type": "hello"})

Did I get it right?

same issue here,

Task exception was never retrieved
future: <Task finished name='Task-90' coro=<Connection.disconnect() done, defined at /usr/local/lib/python3.8/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.8/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 692, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

dependencies:

channels==4.0.0
channels-redis==4.0.0
daphne==4.0.0
Django==3.2.16
django-redis==5.0.0
redis==4.3.4

code:

async_to_sync(channel_layer.group_send)(
    group_name,
    {"type": "store.manager.handout", "data": {"type": request.POST["message_type"]}}
)

this comment did not worked either, I’m getting:

There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

I’m pretty sure the issue we have is very similar to what’s reported above. We do this async_to_sync(channel_layer.group_send)(group_name, channel_layer_payload) from a sync gunicorn worker and I think that’s where this stems from. The stack trace doesn’t go back to our code though, so kinda hard to tell exactly.

I was looking at this just yesterday, refactoring the two layers to share the same connection-handling codebase is a bit complex. The two implementations have a lot of differences and share very few elements. So it may take more time (or at least more concentration) to handle that.

In the meantime I have prepared #347 to address this issue in RedisChannelLayer.

@btel That looks like the example yes. As per https://github.com/django/channels_redis/issues/332#issuecomment-1403375145 I want to look into encapsulating that for the next set of releases

@sevdog That would be nice. There’s no urgency though. poco a poco

@carltongibson thank you for the suggestion. However I am a bit concerned that this may be a bit dirty to force users to call this method when using async_to_sync because the close_pools method is only defined in RedisChannelLayer and is not part of any specification.

Having to make a direct call to .close_pools method is going to be a problem when someone is willingly to move to an other layer implementation (ie: from RedisChannelLayer to RedisPubSubChannelLayer.

I belive that the RedisChannelLayer should have a method like that defined in the pub-sub to intercept loop close event: https://github.com/django/channels_redis/blob/a7094c58a15cbf6e621b2129253060fc80cfdfad/channels_redis/pubsub.py#L15-L27

This way it will be transparent to users and more robust.

Thanks @carltongibson I can confirm this fixes the problem