channels_redis: v4b1: PubSub receive cleanup hang

Following discussion on https://github.com/django/channels_redis/pull/317

On 4.0.0b1, the test_groups_basic in either test_pubsub.py and test_pubsub_sentinel.py tests can hang intermittently. This is most pronounced on CI environments (GitHub actions for this repo show some examples for PRs), and locally for myself occurs roughly every 6-8 runs of the below snippet.

The hang occurs with a RedisPubSubChannelLayer when checking that a message is not received on some particular channel, this is a small test to more easily produce the issue for test_pubsub:

@pytest.mark.asyncio
async def test_receive_hang(channel_layer):
    channel_name = await channel_layer.new_channel(prefix="test-channel")
    with pytest.raises(asyncio.TimeoutError):
        async with async_timeout.timeout(1):
            await channel_layer.receive(channel_name)

Preliminary tracing found receive on attempting to unsubscribe fails to ever return a connection from _get_sub_conn.

A _receive_task appears to never return on multiple attempts, holding a lock indefinitely.

The following print annotations,

    async def _get_sub_conn(self):
        if self._keepalive_task is None:
            self._keepalive_task = asyncio.ensure_future(self._do_keepalive())
        if self._lock is None:
            self._lock = asyncio.Lock()
        print(self._lock)
        async with self._lock:
            if self._sub_conn is not None and self._sub_conn.connection is None:
                await self._put_redis_conn(self._sub_conn)
                self._sub_conn = None
                self._notify_consumers(self.channel_layer.on_disconnect)
            if self._sub_conn is None:
                if self._receive_task is not None:
                    print(self._receive_task)
                    self._receive_task.cancel()
                    try:
                        print("waiting for receive_task")
                        await self._receive_task
                    except asyncio.CancelledError:
                        print("receive_task cancelled")
                        # This is the normal case, that `asyncio.CancelledError` is throw. All good.
                        pass

Produce, on hang an output of:

<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<Task pending name='Task-4' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:409> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88fd895490>()]>>
waiting for receive_task
receive_task got cancelled
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<Task pending name='Task-5' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:391> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f49b6af5c70>()]>>
waiting for receive_task
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [locked]>

Successful runs have the last line swapped for "receive_task cancelled" and a clean exit.

Ideas so far from the above is:

  1. We are consistently loosing the connection to Redis during the test
  2. _recieve_task has here and here as the prime blocking candidates

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 15 (15 by maintainers)

Most upvoted comments

Hi all - hope you’re well, figured I’d pop my head in since I had some free time and see if I could lend a hand.

This jumped out as something interesting to investigate, and I can’t quite make heads or tails of it after a few minutes of poking about. But I had a feeling that it was something to do with the async timeouts package, and a quick look at their repo led me to this old issue which has repro code that looks suspiciously similar to some of our patterns: https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908502523

Anyway will take a another look tomorrow when I have more time.

I’ve rolled in #326 and pushed 4.0.0b2 to PyPI. I’d be grateful if folks could try it out — looking for final releases next week. 👍

Ok - this is now running pretty well in our chaos harness.

Ok, great @qeternity.

If we can get a solution here in the next week or so that would be great, otherwise I’ll push the release anyway, and we’ll have to fix it later. 😜

I’ve found at least a hacky workaround:

async def _do_keepalive(self):
  while True:
    await asyncio.sleep(3)
      try:
        await self._get_sub_conn()
      except Exception:
        logger.exception("Unexpected exception in keepalive task:")

Bumping the sleep on the periodic keep alive task to anything greater than the timeouts in the tests stops any hang from occurring in my usually hang-happy local testing.

Under a tight window while we’re doing our cleanup work, I believe the keep alive kicks off another _get_sub_conn and it is that call that hangs indefinitely on receive.

There is quite a large docstring on _do_keepalive which may not all hold true with redis-py or the changes to _get_sub_conn any more, so I am curious on any input with that. Would the keep alive heartbeat be better configurable as opposed to fixed?

@bbrowning918 can you please have a look at this branch here: https://github.com/zumalabs/channels_redis/pull/11

I’m not entirely sure what the issue is, but the test still highlighted a few improvements nonetheless.