channels: Cannot call AsyncToSync twice in one sync context for channels_redis

I am using a SyncConsumer which adds itself to a group on websocket.connect with group_add method of channel_layer.

class TaskConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.accept()
        AsyncToSync(self.channel_layer.group_add)('task-i-1', self.channel_name)

    def task_message(self, event):
        self.send_json(event["text"])

If I try to send message to this group using group_send method that is wrapped with AsyncToSync, first message is succeed but further messages throw this exception:

>>> c = get_channel_layer() >>> AsyncToSync(c.group_send)('task-i-1', {'type': 'task.message', 'text':{}}) >>> AsyncToSync(c.group_send)('task-i-1', {'type': 'task.message', 'text':{}}) Connection <RedisConnection [db:0]> has pending commands, closing it. Traceback (most recent call last): File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/channel s_redis/core.py”, line 316, in group_send await connection.zremrangebyscore(key, min=0, max=int(time.time()) - self.group_expiry) RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /home/ahmet/webso cket_channel/env/lib64/python3.6/site-packages/asgiref/sync.py:57> cb=[_run_until_complete_cb() at /usr/lib64/python3.6/asyncio/base_events.py:176]> got Future <Future pe nding> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File “<console>”, line 1, in <module> File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/asgiref /sync.py”, line 49, in call call_result.result() File “/usr/lib64/python3.6/concurrent/futures/_base.py”, line 425, in result return self.__get_result() File “/usr/lib64/python3.6/concurrent/futures/_base.py”, line 384, in __get_result raise self._exception File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/asgiref /sync.py”, line 57, in main_wrap result = await self.awaitable(*args, **kwargs) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/channel s_redis/core.py”, line 320, in group_send await connection.zrange(key, 0, -1) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/commands/init.py”, line 152, in exit self._release_callback(conn) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/pool.py”, line 361, in release conn.close() File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/connection.py”, line 352, in close self._do_close(ConnectionForcedCloseError()) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/connection.py”, line 359, in _do_close self._writer.transport.close() File “/usr/lib64/python3.6/asyncio/selector_events.py”, line 621, in close self._loop.call_soon(self._call_connection_lost, None) File “/usr/lib64/python3.6/asyncio/base_events.py”, line 574, in call_soon self._check_closed() File “/usr/lib64/python3.6/asyncio/base_events.py”, line 357, in _check_closed raise RuntimeError(‘Event loop is closed’) RuntimeError: Event loop is closed

  • OpenSuse Leap(42.2)
  • python[3.6.4], django[2.0.2], channels[2.0.0], daphne[2.0.2], channels-redis[2.0.2]. asgiref[2.1.3]
  • Django running with runserver

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 42 (21 by maintainers)

Most upvoted comments

shell asgiref==3.5.2 channels==4.0.0 channels-redis==4.0.0

Task exception was never retrieved
future: <Task finished name='Task-378' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-381' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-382' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop

sometimes I got

Task exception was never retrieved
future: <Task finished name='Task-348' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 338, in close
    return self._transport.close()
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
    self._check_closed()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task exception was never retrieved
future: <Task finished name='Task-349' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 338, in close
    return self._transport.close()
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
    self._check_closed()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

No, still same exception. I used same block code for consumer and triggered in a same way. It always gets failed in the second time. Traceback

I am using this workaround for now instead of AsyncToSync. I am not sure if it is suitable for every case or elegant solution.

loop = asyncio.get_event_loop()
coroutine = channel_layer.group_send(
    group_name,
    {
        'type': 'task.message',
        'text': context
    })
loop.run_until_complete(coroutine)

Right, 2.0.2 will have fixed the issue for some as it removes some threading, but the basic issue still remains, and I’ll look into it soon.

Async programming is Very Hard to get right. I won’t be able to help with workarounds if that didn’t work, let me focus on getting this working this week so it’s rock-solid and maybe write up a document on how the hell async stuff works as you need a basic understanding of it to debug stuff.

i was able to solve this by explicitly created it As it says, there is no current event loop in a new thread.

referenced from https://github.com/tornadoweb/tornado/issues/2308#issuecomment-372582005

full working code should look like

        try:
            # Send message to room group
            asyncio.set_event_loop(asyncio.new_event_loop())
            loop = asyncio.get_event_loop()
            channel_layer = get_channel_layer()
            coroutine = channel_layer.group_send(
            'channel_group',
            {
                'type': 'task_message',
                'message': 'text'
            })
            loop.run_until_complete(coroutine)
        except Exception as e:
            print(e)

Should be releasing itself now, I had forgotten to tag it.

Alright, I am reasonably sure I have fixed the other half of this in https://github.com/django/channels_redis/commit/f5e4799e11f472cc267598e9f78099a160f81550 - would appreciate people confirming things are fine for them (I can no longer make it crash).