channels: Memory leak if writing to a channel that is never read from later

It appears that if you write a message to a channel, for example via group_send, and no reader ever appears on that channel, the messages will remain in the in-memory queue channels.layers.channel_layers.backends['default'].receive_buffer indefinitely when using the RedisChannelLayer backend. In particular I have captured a server that has over 100k items in that dictionary.

One way to avoid this problem would be to extend the API for group_send with a time-to-live parameter so that messages would expire over time if they weren’t read. Thoughts?

My pip freeze, in case it’s useful:

channels==2.1.2
channels-redis==2.2.1

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Comments: 21 (15 by maintainers)

Most upvoted comments

…clients poll for messages instead of subscribing to messages.

Really happy to look at sketches of a reworking there.

channels.consumer.AsyncConsumer.__call__:

    async def __call__(self, receive, send):
        """
        Dispatches incoming messages to type-based handlers asynchronously.
        """
        with contextlib.AsyncExitStack() as stack:
            # Initialize channel layer
            self.channel_layer = get_channel_layer(self.channel_layer_alias)
            if self.channel_layer is not None:
                channel = await stack.enter_async_context(self.channel_layer.new_channel_v2())
                self.channel_name = channel.name
            # Store send function
            if self._sync:
                self.base_send = async_to_sync(send)
            else:
                self.base_send = send
            # Pass messages in from channel layer or client to dispatch method
            try:
                if self.channel_layer is not None:
                    await await_many_dispatch(
                        [receive, channel], self.dispatch
                    )
                else:
                    await await_many_dispatch([receive], self.dispatch)
            except StopConsumer:
                # Exit cleanly
                pass

A channel object is an async iterator: __anext()__ returns a message, and __aclose()__ stops the world.

I think it would be easier to write against this API. I don’t have time to actually submit and test a pull request, though 😃.

@davidfstr @carltongibson

A version of this bug that I’ve also seen - you don’t just see this if the channel is never read from later. You can also see if it the channel is read from too slowly. In the default channels_redis implementation, per-channel asyncio.Queue objects grow in an unbounded way; if they’re not read at the same rate as insertion on the other end, Daphne will continue to just grow in memory consumption forever.

I’d argue that these per-channel Queue objects should probably be bound in size. There’s already a capacity argument; maybe the per-channel buffer should respect that, and only buffer up to that many objects before dropping old ones?

https://github.com/django/channels_redis#capacity

I do think a goal of passive cleanup makes sense, but I think a reasonable upper bound on queue size would likely prevent many people from getting into bad situations in the first place.

Status: Bug doesn’t hurt enough that anyone has put in the effort to fix it yet.

To workaround this (and other kinds of slowish memory leaks), I’ve configured most of my web services to restart after serving (N + random jitter) requests. Gunicorn in particular has a config option for this.