channels_redis: Memory leak when using channels redis PubSub layer

Hello! On the production service, we are using channels and channels Redis to deliver heavy updates on the web socket clients and it has worked fine for us for the last six months. However, since the day that we switched to using the new PubSubChannelLayer, we are facing the problem of the constant infinite growth of memory consumption. and the only way around that we have found is restarting the server process on a constant interval to release the memory back. Here you could find some metadata about the system that we are running: Socket Updates are mostly generated by management commands which are running outside of the context of server processes and they call group_send to provide consumers with new updates. channels==3.0.4 channels-redis==3.3.0 the server is running under an Nginx - uvicorn==0.12.1 stack with the supervisor as the process manager. however, we have tested gunicorn and daphne as well and the memory problem stayed the same. OS: Ubuntu Server 18

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 10
  • Comments: 24 (12 by maintainers)

Most upvoted comments

bump

@Sharpek @pbylina how about you guys spend an ounce of energy on this, instead of posting unhelpful comments.

Any progress here, maybe?

@acu192 Would you like to investigate making that change in django/channels?

Yes, I’ll put some time on my calendar in the next few weeks to create a PR to django/channels.

@fosterseth Thanks for investigating this. Your discoveries got me thinking, and now I have some ideas on what is going wrong here.

First, some context:

The giant comment I wrote here is because django/channels calls new_channel() (here), but it doesn’t offer an obvious way “clean up” that new channel… thus the only way I could figure out how to clean up (i.e. call del self.channels[channel]) was to wait for a CancelledError in receive() (here) and use that opportunity to clean stuff up.

So, what’s going wrong:

@fosterseth I think you’ve found an execution path where CancelledError is never thrown into receive(), thus the PubSub channel layer never gets to clean up. Looking at how that might happen… perhaps if an exception is thrown here, then the channel layer’s receive() is never rescheduled, thus is never canceled. But… whatever the reason… I think we cannot solve this without changing django/channels, which is not too surprising and was the reason I wrote that original big comment. Basically, I believe we’re doing the best we can in PubSub to clean up given that django/channels doesn’t give us a chance to clean up.

How to fix:

I think we should add code (perhaps here) … something like:

if self.channel_layer is not None and hasattr(self.channel_layer, 'clean_channel'):
    self.channel_layer.clean_channel(self.channel_name)

That is, clean_channel() will cleanup anything done by new_channel() … thus an opportunity to clean up that is guaranteed to be called.

Then of course we’d change the PubSub impl to clean up in clean_channel() instead of how it does it now.

Memory going down…

Memory management at the OS-level is whack enough (i.e. calling free() in most real-world applications doesn’t cause a drop in memory consumption reported by the OS, due to fragmentation). Then you add cpython’s memory manager / garbage collector on top of that… and you just cannot expect memory to go down even in times you might expect it. (similar to what @qeternity is saying as well)

My use case…

Again, like @qeternity’s case, in our case we cycle in/out containers often enough (usually because we’re deploying new versions of our app) that this leak isn’t hurting us in production. That said, it would certainly be great to fix it.

another thing to note – when I control + C my client, I see this traceback logged in the runserver terminal

WebSocket DISCONNECT /echo/stream/ [127.0.0.1:45544]
Exception inside application: Attempt to send on a closed protocol
Traceback (most recent call last):
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/staticfiles.py", line 44, in __call__
    return await self.application(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/routing.py", line 71, in __call__
    return await application(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 263, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/auth.py", line 185, in __call__
    return await super().__call__(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/middleware.py", line 26, in __call__
    return await self.inner(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/routing.py", line 150, in __call__
    return await application(
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/home/sbf/channels_app/proj/app/consumers.py", line 20, in internal_message
    await self.send(text_data=event["text"])
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/generic/websocket.py", line 209, in send
    await super().send({"type": "websocket.send", "text": text_data})
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 81, in send
    await self.base_send(message)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 226, in send
    return await self.real_send(message)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/server.py", line 234, in handle_reply
    protocol.handle_reply(message)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/ws_protocol.py", line 202, in handle_reply
    self.serverSend(message["text"], False)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/ws_protocol.py", line 256, in serverSend
    self.sendMessage(content.encode("utf8"), binary)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/autobahn/websocket/protocol.py", line 2254, in sendMessage
    raise Disconnected("Attempt to send on a closed protocol")
autobahn.exception.Disconnected: Attempt to send on a closed protocol

To get around this traceback I added some try / except in my consumers.py

async def internal_message(self, event):
    try:
        await self.send(text_data=event["text"])
    except Exception:
        await self.websocket_disconnect(None)

async def websocket_disconnect(self, message):
    await super().websocket_disconnect(message)

Now when my client disconnects, I no longer see memory growth, which is good. However, I expected the memory to go down (i.e. all of those buffered messages should be released), but it does not. I don’t believe that queue is being cleaned up.

If there are messages in the channel Queue, and the consumer disconnects (I control + C my client), I am not hitting https://github.com/django/channels_redis/blob/bba93196d8fe5e5fbfc470350c1f3da168c56739/channels_redis/pubsub.py#L191

But if there are no messages in the channel Queue, and the consumer disconnects, then I do hit that del self.channels[channel]

I can reproduce this reliably each time.

Ha! I just saw that @acu192 is already on it about 20 min ago!

@fosterseth I’m having a quick look now, just in case the far more capable @acu192 is busy at the moment

put up a PR ^

hopefully someone can help test this out

pip install git+https://github.com/fosterseth/channels_redis.git@clean_channels
pip install git+https://github.com/fosterseth/channels.git@clean_channels

However, I expected the memory to go down (i.e. all of those buffered messages should be released), but it does not. I don’t believe that queue is being cleaned up.

@fosterseth given CPython’s memory allocator, is this really to be expected?

There is a memory leak somewhere, unfortunately we end up cycling k8s containers often enough that it’s not much of an issue and haven’t investigated further.

I set up a basic channels app to explore this a bit

my generator


async def run_async():

    i = 0
    msg = {'type': 'internal.message', 'text': 'sbf'}
    while i < 1000000:
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
            'default',
            msg,
        )

I have a client that reads these (from the consumer) as fast as it can. Importantly, I let the client run for a bit, then hard control + C the client to disconnect it. I see the server detects this disconnect too

WebSocket DISCONNECT /echo/stream/ [127.0.0.1:36750]
websocket_disconnect ===
 asgispecific.0af216d849024bf086de8a873bfd5c37

Now, the generator is still pumping out tons of messages, and I notice the memory of my “python manage.py runserver” process is growing a lot. After all the messages are sent, the memory usage remains high indefinitely (does not go back down).

Note: I did not notice this growth when using channels_redis.core.RedisChannelLayer, only when using PubSub

toward the end of sending the messages, I reconnect my client. I have a pdb debugger set up on the consumer to trigger after all of the messages have sent, and I use guppy3 to print a heap

hp = guppy.hpy() hp.heap()

Partition of a set of 918491 objects. Total size = 88449575 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 626517  68 41201396  47  41201396  47 bytes
     1  99483  11 10011250  11  51212646  58 str
     2     40   0  9632448  11  60845094  69 collections.deque
     3  74337   8  5253008   6  66098102  75 tuple
     4  21609   2  3881513   4  69979615  79 types.CodeType
     5   3607   0  3717464   4  73697079  83 type
     6  22077   2  3179088   4  76876167  87 function
     7   7854   1  2496048   3  79372215  90 dict (no owner)
     8   3607   0  1737608   2  81109823  92 dict of type
     9   1019   0  1330648   2  82440471  93 dict of module

okay so 68% of that memory are bytes, let’s zoom in

hp.heap()[0].byrcs

Partition of a set of 1626520 objects. Total size = 106201611 bytes.
 Index  Count   %     Size   % Cumulative  % Referrers by Kind (class / dict of class)
     0 1589575  98 103322357  97 103322357  97 collections.deque
     1  34735   2  2790813   3 106113170 100 types.CodeType
     2   1368   0    51518   0 106164688 100 dict (no owner)
     3    653   0    26241   0 106190929 100 tuple
     4    104   0     5914   0 106196843 100 dict of module
     5     14   0      814   0 106197657 100 re.Pattern, tuple
     6      9   0      770   0 106198427 100 dict of type
     7     15   0      698   0 106199125 100 list
     8      6   0      443   0 106199568 100 aioredis.util.coerced_keys_dict
     9      6   0      443   0 106200011 100 dict of aioredis.pubsub._Sender, tuple

all of that data is in collections.deque objects

Hopefully this sheds some light on this issue