channels: Cannot call AsyncToSync twice in one sync context for channels_redis
I am using a SyncConsumer which adds itself to a group on websocket.connect
with group_add
method of channel_layer.
class TaskConsumer(JsonWebsocketConsumer):
def connect(self):
self.accept()
AsyncToSync(self.channel_layer.group_add)('task-i-1', self.channel_name)
def task_message(self, event):
self.send_json(event["text"])
If I try to send message to this group using group_send
method that is wrapped with AsyncToSync, first message is succeed but further messages throw this exception:
>>> c = get_channel_layer()
>>> AsyncToSync(c.group_send)('task-i-1', {'type': 'task.message', 'text':{}})
>>> AsyncToSync(c.group_send)('task-i-1', {'type': 'task.message', 'text':{}})
Connection <RedisConnection [db:0]> has pending commands, closing it.
Traceback (most recent call last):
File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/channel
s_redis/core.py”, line 316, in group_send
await connection.zremrangebyscore(key, min=0, max=int(time.time()) - self.group_expiry)
RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /home/ahmet/webso
cket_channel/env/lib64/python3.6/site-packages/asgiref/sync.py:57> cb=[_run_until_complete_cb()
at /usr/lib64/python3.6/asyncio/base_events.py:176]> got Future <Future pe
nding> attached to a different loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File “<console>”, line 1, in <module> File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/asgiref /sync.py”, line 49, in call call_result.result() File “/usr/lib64/python3.6/concurrent/futures/_base.py”, line 425, in result return self.__get_result() File “/usr/lib64/python3.6/concurrent/futures/_base.py”, line 384, in __get_result raise self._exception File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/asgiref /sync.py”, line 57, in main_wrap result = await self.awaitable(*args, **kwargs) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/channel s_redis/core.py”, line 320, in group_send await connection.zrange(key, 0, -1) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/commands/init.py”, line 152, in exit self._release_callback(conn) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/pool.py”, line 361, in release conn.close() File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/connection.py”, line 352, in close self._do_close(ConnectionForcedCloseError()) File “/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi s/connection.py”, line 359, in _do_close self._writer.transport.close() File “/usr/lib64/python3.6/asyncio/selector_events.py”, line 621, in close self._loop.call_soon(self._call_connection_lost, None) File “/usr/lib64/python3.6/asyncio/base_events.py”, line 574, in call_soon self._check_closed() File “/usr/lib64/python3.6/asyncio/base_events.py”, line 357, in _check_closed raise RuntimeError(‘Event loop is closed’) RuntimeError: Event loop is closed
- OpenSuse Leap(42.2)
- python[3.6.4], django[2.0.2], channels[2.0.0], daphne[2.0.2], channels-redis[2.0.2]. asgiref[2.1.3]
- Django running with runserver
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 42 (21 by maintainers)
shell asgiref==3.5.2 channels==4.0.0 channels-redis==4.0.0
sometimes I got
No, still same exception. I used same block code for consumer and triggered in a same way. It always gets failed in the second time. Traceback
I am using this workaround for now instead of AsyncToSync. I am not sure if it is suitable for every case or elegant solution.
Right, 2.0.2 will have fixed the issue for some as it removes some threading, but the basic issue still remains, and I’ll look into it soon.
Async programming is Very Hard to get right. I won’t be able to help with workarounds if that didn’t work, let me focus on getting this working this week so it’s rock-solid and maybe write up a document on how the hell async stuff works as you need a basic understanding of it to debug stuff.
i was able to solve this by explicitly created it As it says, there is no current event loop in a new thread.
referenced from https://github.com/tornadoweb/tornado/issues/2308#issuecomment-372582005
full working code should look like
Should be releasing itself now, I had forgotten to tag it.
Alright, I am reasonably sure I have fixed the other half of this in https://github.com/django/channels_redis/commit/f5e4799e11f472cc267598e9f78099a160f81550 - would appreciate people confirming things are fine for them (I can no longer make it crash).