channels: Testing with pytest-asyncio raises RuntimeError: Event loop is closed
Hi, this is a duplicate of this question.
I’m trying to test new channels
2.0 with pytest-asyncio
(0.8.0). If I place different assertions in the same function like:
import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer
@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
path = 'foo'
communicator = WebsocketCommunicator(MyConsumer, path)
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
communicator = await setup_database_and_websocket()
sent = {"message": 'abc'}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
communicator = await setup_database_and_websocket()
sent = {"message": 1}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
then I’m not getting an error. But if I separate test cases like:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
communicator = await setup_database_and_websocket()
sent = {"message": 'abc'}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
communicator = await setup_database_and_websocket()
sent = {"message": 1}
await communicator.send_json_to(sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
then I’m getting following error upon the second receive_form
call:
with pytest.raises(TimeoutError):
> await communicator.receive_from()
someapp/tests/test_consumers_async.py:106:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError
Also if I do (as in https://channels.readthedocs.io/en/latest/topics/testing.html):
await communicator.disconnect()
instead of:
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
then the following error is raised:
> await communicator.disconnect()
someapp/tests/test_consumers_async.py:96:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}
def websocket_disconnect(self, message):
"""
Called when a WebSocket connection is closed. Base level so you don't
need to call super() all the time.
"""
# TODO: group leaving
> self.disconnect(message["code"])
E TypeError: disconnect() takes 1 positional argument but 2 were given
What should I do to separate those test cases in the respective individual test functions?
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 38 (17 by maintainers)
I have pushed up commits to the master branch of both
asgiref
andchannels
. If you could pull those down and install them both, that should fix the issue and I can close this (your consumer is throwing another error, I presume, the problem here was that error handling skipped cleanup for all the async stuff and left it in a mess)Alright, I’ve managed to replicate it locally with that, thanks. Will investigate and fix soon.
Well, the actual bug here is waaaay back and is this:
RuntimeError: Two event loops are trying to receive() on one channel layer at once!
The question is, why. I presume what is happening is a consumer is still living on in the background (and trying to receive) as the next test is coming around.
Do all your tests that use the Communicator end with
WebsocketCommunicator.disconnect()
?or something with how Redis is talking to channels…
or
should be good enough.
can you try to use
channels.layers.InMemoryChannelLayer
if you haven’t already.Yes: