channels: WARNING - server - Application instance took too long to shut down and was killed

first message is succeed but further messages throw this exception:

2018-08-09 15:25:32,937 - WARNING - server - Application instance <Task pending coro=<__call__() running at /home/soham/PycharmProjects/lead-generation/venv/lib/python3.5/site-packages/channels/sessions.py:175> wait_for=<Future pending cb=[Task._wakeup()]>> for connection <WebSocketProtocol client=['127.0.0.1', 35996] path=b'/lead/'> took too long to shut down and was killed.

My consumer class

class LeadConsumer(AsyncWebsocketConsumer):
    async def websocket_connect(self, event):
        print("Connected", event)
        self.room_group_name = 'lead_list'

        # join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()

    async def websocket_receive(self, event):
        print("Receive", event)
        await self.send({
            "type": "websocket.send",
            "text": "From receive..."
        })

    async def websocket_disconnect(self, event):
        print("Disconnect", event)
        await self.send({
            "type": "websocket.close"
        })

    async def lead_list(self, event):
        _contact = event['contact']
        _campaign = event['campaign']
        _company = event['company']
        _contact_person = event['contact_person']

        await self.send(text_data=json.dumps({
            'contact': _contact,
            'campaign': _campaign,
            'contact_person': _contact_person,
            'company': _company
        }))

requirements.txt

aioredis==1.1.0
asgiref==2.3.2
async-timeout==2.0.1
attrs==18.1.0
autobahn==18.7.1
Automat==0.7.0
certifi==2018.4.16
channels==2.1.2
channels-redis==2.2.1
chardet==3.0.4
constantly==15.1.0
daphne==2.2.1
diff-match-patch==20121119
Django==2.0.7
django-import-export==1.0.1
django-rest-framework==0.1.0
djangorestframework==3.8.2
docutils==0.14
et-xmlfile==1.0.1
hiredis==0.2.0
hyperlink==18.0.0
idna==2.7
incremental==17.5.0
jdcal==1.4
msgpack==0.5.6
odfpy==1.3.6
openpyxl==2.5.4
PyHamcrest==1.9.0
python-decouple==3.1
pytz==2018.5
PyYAML==3.13
requests==2.19.1
six==1.11.0
tablib==0.12.1
Twisted==18.7.0
txaio==18.7.1
unicodecsv==0.14.1
urllib3==1.23
xlrd==1.1.0
xlwt==1.3.0
zope.interface==4.5.0

How to solve my issue?
Thanks in advance.

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Reactions: 4
  • Comments: 36 (2 by maintainers)

Most upvoted comments

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

In my case, the problem was with redis.

check if there are tcp connections in CLOSE_WAIT state.

netstat | grep 6379 | fgrep -c CLOSE_WAIT

In my case I had around 50+ in such state. Had to update redis.

I managed to resolve this issue in local dev without needing to use both wsgi and asgi servers together (channels==3.0.4, asgiref==3.5.0, Django==4.0.3). I tried a combination of:

  1. using Reconnecting WebSockets on the client side, as suggested by destelio. For example:
const options = {
  WebSocket: WebSocket,
  connectionTimeout: 10000,
  maxRetries: 3,
  minUptime: 2000,
  maxEnqueuedMessages: 1000000,
};

const newSocket = new ReconnectingWebSocket(
  'ws://' +
  window.location.host +
  '/ws/somepath/' +
  roomName +
  '/' +
  '?' +
  queryString,
  [],
  options
);
  1. Setting thread_sensitive to False globally, and setting it to True manually for ORM operations. i.e. using @sync_to_async(thread_sensitive=True) as a decorator kwarg for sync functions involving writing to the database.

  2. If you have a connect() method that involves long running tasks, simply call await self.accept() as early as possible within the method, right below await self.channel_layer.group_add().

Initially, this worked fine for connection timeouts. However, in my particular case, my disconnect method requires a relatively lengthy ORM operation involving celery workers. Therefore, after switching to a remote database, the disconnect method became an issue once again.

Fortunately, there is a simple workaround.

Instead of using the development server — i.e. python manage.py runserver — run it in production mode with Daphne. That way, you can specify a timeout interval via the --application-close-timeout flag.

Simply run:

daphne -t 60 --application-close-timeout 60 your_project.asgi:application

The -t flag is for --http-timeout. It may not be necessary.

Of course, with this approach, you will need to serve any local static files through `STATIC_ROOT’. Whitenoise can be used to serve static files. It will work with an ASGI server (within Django anyway. Outside of Django I don’t think it supports ASGI). If you don’t want file caching, replace their suggested STATICFILES_STORAGE with:

STATICFILES_STORAGE = 'whitenoise.storage.CompressedStaticFilesStorage'

Hope this helps someone. Thanks to everyone for such an awesome project🙏 💯

I’m gonna say this here as it may help others. In my case the problem was just a silly mistake. In the code I was maintaining websocket_disconnect was defined without call to super() so StopConsumer() was never raised. The right way would be to override disconnect method which is wrapped inside websocket_disconnect in AsyncWebsocketConsumer. Just avoiding to override websocket_disconnect solved my problem.

In the example above the same pattern exists. However I am not sure if that was the case back then in 2018 or this is a new style in Channels’ code.

This means something in your consumer is not exiting after the disconnect (you don’t need to send websocket.close, so get rid of that) - it’s not a problem in Channels from what I can tell.

If you can trace it down to a clear problem in channels with steps to reproduce from a fresh project, please re-open the bug - you can see more about where to ask questions at http://channels.readthedocs.io/en/latest/support.html

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

You Saved me from a lot of troubles, May I ask why asgiref is the issue and how you found it ?

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).

As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.

We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.

If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service. If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.

It’s comparable: Create more tasks to handle some type of messages in a consumer instance (for one connection with states). And Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).

I implemented a Consumer to dispatch message handler as tasks (only affect user defined tasks):

import asyncio
import copy
import collections
import functools
import json
from channels.consumer import get_handler_name
from channels.generic.websocket import AsyncWebsocketConsumer


class ChatConsumer(AsyncWebsocketConsumer):
    MAX_ACTIVE_TASKS = 2

    def __init__(self, *args, **kwargs):
        super(ChatConsumer, self).__init__(*args, **kwargs)
        self.handler_tasks = collections.defaultdict(list)
        self.joined_groups = set()

        self.room_name = None
        self.room_group_name = None

    def complete_task(self, task_instance, handler_name):
        print(f'Complete task for handler {handler_name}, task instance {task_instance}')
        self.handler_tasks[handler_name].remove(task_instance)
        print(
            f'There are still {len(self.handler_tasks[handler_name])} active tasks for'
            f' handler {handler_name}'
        )

    async def dispatch(self, message):
        handler_name = get_handler_name(message)
        handler = getattr(self, handler_name, None)
        if handler:
            if handler_name.startswith('chat_'):
                # Create a task to process message
                loop = asyncio.get_event_loop()
                if len(self.handler_tasks[handler_name]) >= self.MAX_ACTIVE_TASKS:
                    await self.send(text_data=json.dumps({
                        'message': 'MAX_ACTIVE_TASKS reached'
                    }))
                else:
                    handler_task = loop.create_task(handler(message))
                    # don't forget to remove the task from self.handler_tasks
                    # when task completed
                    handler_task.add_done_callback(
                        functools.partial(self.complete_task, handler_name=handler_name)
                    )
                    self.handler_tasks[handler_name].append(handler_task)
            else:
                # The old way to process message
                await handler(message)
        else:
            raise ValueError("No handler for message type %s" % message["type"])

    async def clear_handler_tasks(self):
        for handler_name in self.handler_tasks:
            task_instances = self.handler_tasks[handler_name]
            for task_instance in task_instances:
                task_instance.cancel()
                # try:
                #     await task_instance
                # except asyncio.CancelledError:
                #     print('Cancelled handler task', task_instance)

    async def disconnect(self, code):
        joined_groups = copy.copy(self.joined_groups)
        for group_name in joined_groups:
            await self.leave_group(group_name)
        self.joined_groups.clear()
        await self.clear_handler_tasks()

    async def leave_group(self, group_name):
        await self.channel_layer.group_discard(
            group_name, self.channel_name
        )
        self.joined_groups.remove(group_name)

    async def join_group(self, group_name):
        await self.channel_layer.group_add(
            group_name, self.channel_name
        )
        self.joined_groups.add(group_name)

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        await self.join_group(self.room_group_name)
        await self.accept()

    async def receive(self, text_data=None, bytes_data=None):
        text_json = json.loads(text_data)
        message = text_json['message'].strip()
        if message.endswith('1'):

            await self.channel_layer.group_send(self.room_group_name, {
                'type': 'chat_message',
                'message': message,
            })
        elif message.endswith('2'):
            await self.channel_layer.group_send(self.room_group_name, {
                'type': 'chat_message2',
                'message': message,
            })
        else:
            await self.send(text_data=json.dumps({
                'message': 'invalid data'
            }))

    async def chat_message(self, event):
        message = event['message']

        while True:
            print('sending...')
            await self.send(text_data=json.dumps({
                'message': message
            }))
            await asyncio.sleep(1)

    async def chat_message2(self, event):
        message = event['message']
        await self.send(text_data=json.dumps({
            'message': message
        }))

In this implementation, you can send message such as “g1” to start the long running handler(chat_message), and still you can send “g2” to start the chat_message2 handler, if you send other data not endswith 1 or 2, you will receive a “invalid message” reply.

Client intput and output:

(venv) localhost:light Felix$ python manage.py chat_client
connected
g1
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
Received:  {"message": "g1"}
2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g2
Received:  {"message": "g2"}
Received:  {"message": "g1"}
g1
Received:  {"message": "MAX_ACTIVE_TASKS reached"}
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
Received:  {"message": "g1"}
g1
Received:  {"message": "MAX_ACTIVE_TASKS reached"}
Received:  {"message": "g1"}
gReceived:  {"message": "g1"}
x
Received:  {"message": "invalid data"}
Received:  {"message": "g1"}
gxReceived:  {"message": "g1"}

Received:  {"message": "invalid data"}
Received:  {"message": "g1"}
Received:  {"message": "g1"}
^CDone

Server input and output:


Django version 3.1.5, using settings 'light.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /ws/chat/cc/ [127.0.0.1:57667]
WebSocket CONNECT /ws/chat/cc/ [127.0.0.1:57667]
sending...
sending...
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-17' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-23' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
Complete task for handler chat_message2, task instance <Task finished name='Task-29' coro=<ChatConsumer.chat_message2() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:116> result=None>
There are still 0 active tasks for handler chat_message2
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
sending...
WebSocket DISCONNECT /ws/chat/cc/ [127.0.0.1:57667]
Complete task for handler chat_message, task instance <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>
There are still 0 active tasks for handler chat_message
Cancelled handler task <Task cancelled name='Task-11' coro=<ChatConsumer.chat_message() done, defined at /Users/Felix/PycharmProjects/light/chat/consumers.py:106>>

It’s a full duplex websocket consumer now, we can send data to websocket server at any time and get response, and websocket server can send data at any time too. The long running consumer handler will not block the consumer in the new implementation.

So, are you interested to implement it at the channels framework level ? @carltongibson @andrewgodwin

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

Thanks @Saran33, I’ll take a look at those.

Looking back on this thread, in the case of blocking I/O or CPU intensive operations during a disconnect method, it might be worth offloading those tasks to a worker, with something like celery or RQ.

That was my thought too. I was considering using Celery, but Celery isn’t meant to be a real-time queue, and more importantly, if you offload the task to Celery you would have a hard(er) time pushing the result back to the channels consumer.

I guess you could submit the task and then continually poll it, waiting for it to be complete, but I didn’t like that approach.

Basically, what I did was create a Queue and Thread:

class EditorConsumer(WebsocketConsumer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        ...
        self.requests_queue = Queue()
        self.requests_thread = None
        self.stop_event = Event()

    def connect(self):
        self.requests_thread = Thread(
            target=self.process_requests,
            daemon=True
        )
        self.requests_thread.start()

        ...

        self.accept() 

    def disconnect(self, code):
        self.stop_event.set()

    def receive(self, text_data=None, bytes_data=None):
        self.requests_queue.put(text_data)

    def process_requests(self):
        while not self.stop_event.is_set():
            text_data = self.requests_queue.get()

            ...

            # NOTE: It's worth checking if the `stop_event` is set here. If you try to call
            # `self.send` afterwards, you're likely going to error out
            if not self.stop_event.is_set():
                # send the response back to the client
                self.send(text_data=json.dumps(output_data))

Using this setup, disconnect is allowed to run and raise the StopConsumer event. The process_requests daemon is left to finish up whatever item it is currently on in the queue, then it exits, regardless if the queue is empty or not.

It’s probably not the most “elegant” way to solve the problem, but it keeps the solution entirely inside the consumers.py file without having to involve Celery, Redis, etc., which has additional overhead (and again, Celery isn’t meant to be real-time, which IMO, doesn’t make it the right choice when working with websockets).

Edit: If my logic/thought process about not using Celery here is incorrect I would love to know.

I am not sure if my issue is the same, but it results in a similar error. In my case, I am running my Django Channels ASGI app using Daphne, and I have a couple API calls that call an external API. I found that when that external API errored out and I did not have a timeout set on the requests call, it would hang the Daphne server. While I did have a flaw in my code, I do not think that such a flaw should render the entire server unresponsive and not able to be restarted through systemctl. Note that when I switched to uvicorn, I no longer had an issue even with the misbehaving code. In my real application, I have of course fixed my external API calls to have an explicit timeout. Here is a simplified repro case based on my real application: https://github.com/jessamynsmith/daphne_hang/

Here is my exact error: 2023-02-14 05:01:42,735 WARNING Application instance <Task pending name=‘Task-6’ coro=<ProtocolTypeRouter.call() running at /Users/jessamynsmith/Development/daphne_hang/venv/lib/python3.11/site-packages/channels/routing.py:62> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/jessamynsmith/.pyenv/versions/3.11.0/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>> for connection <WebRequest at 0x108603890 method=GET uri=/api/v1/hang/ clientproto=HTTP/1.1> took too long to shut down and was killed.

@tinylambda thanks for the extra info. Not sure yet if this is a usage issue we should document or something else, but I will reopen to investigate

@carltongibson Thanks for the reply. In my use case, I want to use django-channels as an access layer, and keep sending new game state to user either by time interval (every 1 second) or states (state changed) from redis server. I think it should be a typical use case when it comes to websockets.

If the coroutine (here chat_message) handle the message takes too long time to complete (to broadcast messages for example), it will block every message with the same type process until previous coroutine completes (maybe never).

As a potential solution, how about dispatching every handler coroutine (chat_message) in tasks (loop.create_task), and trace every task instance in a central place (for a consumer instance), and check the tasks at some point (when and how?) to clear completed ones, and when websocket disconnected, just cancel all tasks remaining active.

We can add a MAX_ACTIVE_TASKS to limit the max tasks can be created to avoid creating too many slow tasks.

If set MAX_ACTIVE_TASKS=1, we can use django-channels server as a broadcast service. If set MAX_ACTIVE_TASKS > 1 we can process more same type of messages concurrently.

It’s comparable: Create more tasks to handle some type of messages in a consumer instance (for one connection with states). And Create more threads or processes to handle user requests in a HTTP web server (for all user requests without states).

In my particular case it was asgiref==3.3.1 the culprit, i did a rollback to 3.2.10 and it works well ! I have daphne 2.5.0/channels 2.4.0/django 3.0.10/djangorestframework 3.11.1

This solved the issue for us. The app started to time out requests and emitting those warnings.

Pip freezing seems to be a must.