channels: Cannot access scope["user"] object from an async consumer

Traceback:

2018-04-13 17:44:51,382 - ERROR - server - Exception inside application: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
  File "/tmp/channels-examples/multichat/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/tmp/channels-examples/multichat/channels/utils.py", line 50, in await_many_dispatch
    await dispatch(result)
  File "/tmp/channels-examples/multichat/channels/consumer.py", line 67, in dispatch
    await handler(message)
  File "/tmp/channels-examples/multichat/channels/generic/websocket.py", line 173, in websocket_connect
    await self.connect()
  File "/tmp/channels-examples/multichat/chat/consumers.py", line 26, in connect
    if self.scope["user"].is_anonymous:
  File "/tmp/channels-examples/multichat/env/lib/python3.6/site-packages/django/utils/functional.py", line 215, in inner
    self._setup()
  File "/tmp/channels-examples/multichat/env/lib/python3.6/site-packages/django/utils/functional.py", line 349, in _setup
    self._wrapped = self._setupfunc()
  File "/tmp/channels-examples/multichat/channels/auth.py", line 142, in <lambda>
    scope["user"] = SimpleLazyObject(lambda: async_to_sync(get_user)(scope))
  File "/tmp/channels-examples/multichat/env/lib/python3.6/site-packages/asgiref/sync.py", line 34, in __call__
    "You cannot use AsyncToSync in the same thread as an async event loop - "
  You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

Using AuthMiddleware: accessing the scope[“user”] object from an asynchronous consumer triggers this error.

This happens since #923 was merged (published in Channels v2.1.0). The traceback can be reproduced with the following steps:

  • clone andrewgodwin/channels-example and install dependencies.
  • create a superuser
  • create a chatroom on the admin site
  • try to connect to that chatroom

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 15 (5 by maintainers)

Most upvoted comments

OK this is released in channels 2.1.1.

Just confirming that this is an error caused by the release of Channels 2.1, but it’s also correct to complain. I am investigating possible fixes now.

In the meantime, if this is happening in your own code, you can replace scope["user"] with await channels.auth.get_user(scope)

I have this same issue when calling async_to_sync(chanel_layer.send) from celery task with channels 3.0.4 and daphne 3.02

I have something weird issue going on.

My packages

asgiref==3.2.1 celery==4.2.0 channels==2.2.0 channels-redis==2.4.0 daphne==2.3.0 Django==2.2.4 redis==3.3.7

This is my consumer class -

 class JobsConsumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()

    async def disconnect(self, code):
        pass

    async def receive_json(self, content, **kwargs):
        print(content)
        if content:
            if content['action'] == "start_sec3":
                await self.start_sec3(job_name=content['job_name'])

    async def start_sec3(self, job_name):
        print("Start_sec3 called")
        log.debug("job Name=%s", job_name)
        # Save model to our database
        job = Job(
            name=job_name,
            status="started",
        )
        database_sync_to_async(job.save())

        print("Job saved")

        # Start long running task here (using Celery)
        print("Celery called")
        sec3_task = sec3.delay(job.id, self.channel_name)

        # Store the celery task id into the database if we wanted to
        # do things like cancel the task in the future
        job.celery_id = sec3_task.id
        database_sync_to_async(job.save())

        print("Celery id saved")

        # Tell client task has been started

        print("Telling to client")

        await self.send_json({
                "action": "started",
                "job_id": job.id,
                "job_name": job.name,
                "job_status": job.status,
            })

    async def celery_job(self, data):
        await self.send_json(data)`

My Celery Task -

@shared_task
def sec3(job_id, reply_channel):
    # time sleep represent some long running process
    time.sleep(3)
    # Change task status to completed
    job = Job.objects.get(pk=job_id)
    log.debug("Running job_name=%s", job.name)
    print("Running job_name=%s", job.name)

    job.status = "completed"
    job.completed = datetime.now()
    job.save()

    # Send status update back to browser client
    if reply_channel is not None:
        print('Sending completed status.')
        print(reply_channel)
        channel_layer = channels.layers.get_channel_layer()

       # Problem occurs here (Sometimes)
        async_to_sync(channel_layer.send)(reply_channel, {
                "type": 'celery_job',
                "action": "completed",
                "job_id": job.id,
                "job_name": job.name,
                "job_status": job.status,
            })

When task are added with some delay of 1-2 seconds, everything works perfectly i.e job status is notified correctly by async_to_sync. But when task are added rapidly async_to_sync failed to notify by giving error

"You cannot use AsyncToSync in the same thread as an async event loop - "

According to this answer on SO - https://stackoverflow.com/a/53544010/8137534 async_to_sync should check outer_loop.

I’m not getting this outer_loop thing and why this happens?

OK, this should be fixed on master now. I would appreciate if someone else can verify that it is fixed, and then I’ll issue a bugfix release.

Note that you should make sure to install daphne>=2.1.1 so that you get the change there that runs application constructors in a threadpool (as I’ve had to move the user fetch to be synchronous at application construction time)