redis-py: New `asyncio.exceptions.CancelledError` from 4.5.2

Version: 4.5.2 Platform: Python 3.11.2 on Debian unstable Description:

Starting with 4.5.2, I’m seeing a new exception when attempting to write to Redis using the asyncio layer. The problem seems to be related to FastAPI: if I create a FastAPI app with any Starlette custom middlware, and then try to set a key in Redis using a separate connection pool created with redis.asyncio.from_url, the set request fails immediately with an uncaught asyncio.exceptions.CancelledError and the following backtrace:

.tox/py/lib/python3.11/site-packages/gafaelfawr/storage/base.py:140: in store
    await self._redis.set(key, encrypted_data, ex=lifetime)
.tox/py/lib/python3.11/site-packages/redis/asyncio/client.py:509: in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
.tox/py/lib/python3.11/site-packages/redis/asyncio/connection.py:1408: in get_connection
    if await connection.can_read_destructive():
.tox/py/lib/python3.11/site-packages/redis/asyncio/connection.py:817: in can_read_destructive
    return await self._parser.can_read_destructive()
.tox/py/lib/python3.11/site-packages/redis/asyncio/connection.py:250: in can_read_destructive
    return await self._stream.read(1)
/usr/lib/python3.11/asyncio/streams.py:689: in read
    await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport fd=15 read=polling write=<idle, bufsize=0>>>
func_name = 'read'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
    
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')
    
        assert not self._eof, '_wait_for_data after EOF'
    
        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()
    
        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           asyncio.exceptions.CancelledError

/usr/lib/python3.11/asyncio/streams.py:522: CancelledError

GitHub Actions failure log: https://github.com/lsst-sqre/gafaelfawr/actions/runs/4473916874/jobs/7861846517

Reverting to redis 4.5.1 makes the problem disappear again, so it appears to be triggered by some change in 4.5.2.

Here’s the shortest test case that I’ve come up with. It depends on FastAPI and Starlette but none of my code base. My guess is that something is happening during addition of the middleware that’s breaking something about the asyncio work inside redis, but I’m not sure what that could be. Oddly, adding one of the standard Starlette middleware classes does not trigger this problem, only a custom middleware class derived from BaseHTTPMiddleware.

from collections.abc import Callable, Awaitable

import redis.asyncio
from fastapi import FastAPI, APIRouter, Request, Response
from httpx import AsyncClient
from starlette.middleware.base import BaseHTTPMiddleware


class DummyMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self,
        request: Request,
        call_next: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        return await call_next(request)


@pytest.mark.asyncio
async def test_redis() -> None:
    app = FastAPI()
    app.add_middleware(DummyMiddleware)

    @app.get("/")
    async def index() -> dict[str, str]:
        return {}

    async with AsyncClient(app=app, base_url="https://example.com/") as client:
        await client.get("/")
        pool = redis.asyncio.from_url("redis://localhost:6379/0")
        await pool.set("key", "value")

(If I had to guess, it would be the switch to asyncio.timeout, but I can’t find evidence to support that. I know asyncio.timeout does raise CancelledError and then converts it to TimeoutError, but I couldn’t find any place where that machinery shouldn’t work correctly. This error happens immediately, so it doesn’t seem to be a timeout and therefore I may be barking up the wrong tree.)

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 13
  • Comments: 27 (12 by maintainers)

Commits related to this issue

Most upvoted comments

To everyone here, I think I found the culprit.

After testing checking that the issue from my reproduction example did not happen with the async_timeout lib, I went looking at the difference between it and the cpython’s one. I didn’t have to go too far, when looking at the history from the timeouts module I found this commit: https://github.com/python/cpython/commit/04adf2df395ded81922c71360a5d66b597471e49

I applied that patch locally and the issue from my reproduction example was gone.

So, this is actually a cpython issue and not a redis issue, and specifically for python 3.11 because for 3.10 and earlier the async_timeout lib is still being used.

What redis can do here is to change all imports:

if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
    from asyncio import timeout as async_timeout
else:
    from async_timeout import timeout as async_timeout

to

if sys.version_info >= (3, 11, 3):
    from asyncio import timeout as async_timeout
else:
    from async_timeout import timeout as async_timeout

Because the change was cherry-picked to the 3.11 branch and will be available on the next point release (which is 3.11.3, the current one is 3.11.2).

I actually already opened a PR for that: https://github.com/redis/redis-py/pull/2659

edit: the cpython’s PR: https://github.com/python/cpython/pull/102815

@sileht I have created a minimal reproduction example using httpx and redis@4.5.3

import asyncio

import httpx
import redis.asyncio as redis

pool = redis.ConnectionPool(host="localhost", port=6379, db=0)


async def without_httpx():
    print("without_httpx")
    conn = redis.Redis(connection_pool=pool)
    print(await conn.get("foo"))
    print(await conn.get("bar"))


async def with_httpx_async():
    print("without_httpx_async")
    conn = redis.Redis(connection_pool=pool)
    print(await conn.get("foo"))
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/get")
        print(response.json())
    print(await conn.get("bar"))


async def main():
    conn = redis.Redis(connection_pool=pool)
    await conn.set("foo", "1")
    await conn.set("bar", "2")
    await without_httpx()
    await with_httpx_async()


if __name__ == "__main__":
    asyncio.run(main())

Running it will result in:

without_httpx
b'1'
b'2'
with_httpx_sync
b'1'
{'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.23.3', 'X-Amzn-Trace-Id': 'Root=1-641c81d6-2e145a8c4ca1ee0a178c2573'}, 'origin': '45.4.34.79', 'url': 'https://httpbin.org/get'}
b'2'
without_httpx_async
b'1'
{'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.23.3', 'X-Amzn-Trace-Id': 'Root=1-641c81d6-3ec88c7b5bd6013d2201d553'}, 'origin': '45.4.34.79', 'url': 'https://httpbin.org/get'}
Traceback (most recent call last):
  File "/tmp/xxx/test.py", line 46, in <module>
    asyncio.run(main())
  File "/home/bellini/.local/share/rtx/installs/python/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/bellini/.local/share/rtx/installs/python/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bellini/.local/share/rtx/installs/python/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/tmp/xxx/test.py", line 42, in main
    await with_httpx_async()
  File "/tmp/xxx/test.py", line 33, in with_httpx_async
    print(await conn.get("bar"))
          ^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/xxx/venv/lib/python3.11/site-packages/redis/asyncio/client.py", line 509, in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/xxx/venv/lib/python3.11/site-packages/redis/asyncio/connection.py", line 1408, in get_connection
    if await connection.can_read_destructive():
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/xxx/venv/lib/python3.11/site-packages/redis/asyncio/connection.py", line 817, in can_read_destructive
    return await self._parser.can_read_destructive()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/xxx/venv/lib/python3.11/site-packages/redis/asyncio/connection.py", line 250, in can_read_destructive
    return await self._stream.read(1)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bellini/.local/share/rtx/installs/python/3.11.2/lib/python3.11/asyncio/streams.py", line 689, in read
    await self._wait_for_data('read')
  File "/home/bellini/.local/share/rtx/installs/python/3.11.2/lib/python3.11/asyncio/streams.py", line 522, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

Testing with 4.5.2 also gives the same error, but installing 4.5.1 makes the code work without any errors

In 4.5.1 the output is the expected:

without_httpx
b'1'
b'2'
without_httpx_async
b'1'
{'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'python-httpx/0.23.3', 'X-Amzn-Trace-Id': 'Root=1-641c825f-044e318c246993f739b4a1b6'}, 'origin': '45.4.34.79', 'url': 'https://httpbin.org/get'}
b'2'

Don’t know if httpx is doing something that it shouldn’t, but it is strange that it affected redis later without any obvious reason

Thanks for the help it worked upgrading to 3.11.4

I’m facing the exactly same issue on version 4.5.5 using python 3.11.2

The issue started after anyio version was bumped from 3.6.2 to 3.7.0. The suggested solution of updating python version to 3.11.3 did the trick for me.

@curtiscook Yes, this issue fixed in in 4.5.4 (#2659)

@bellini666 nice finding!

Looks like version 4.5.3 release didn’t resolve the issue.

  • just closed, then reopen.

Since redis uses a connection pool by default, this implies to me that there are two bugs:

  • asyncio.exceptions.CancelledError closing the redis connection (existing)
  • Redis not reconnecting on disconnect when using a connection pool (new)

FWIW: I’m experiencing the same issue. I’ve noticed that the connection does work, but seems to break if you await a function that’s not Redis. We were able to reproduce this on the command line w/o starlette (even though we are using FastAPI) given

def get_redis_pool() -> Redis:
    global _redis_pool

    if _redis_pool is None or _redis_pool.connection is None:
        _redis_pool = aioredis.from_url(REDIS_URL, max_connections=10)
    assert _redis_pool is not None
    return _redis_pool


async def get_key(key: str) -> Any:
    redis = get_redis_pool()
    return await redis.get(key)


async def set_key(key: str, value: Any, expire_seconds: Optional[int] = None) -> bool:
    redis = get_redis_pool()
    if expire_seconds is not None:
        exp = timedelta(seconds=expire_seconds)
        await redis.set(key, value, ex=exp)
    else:
        await redis.set(key, value)
    return True

This function will work fine:

async def foo():
	await get_key("key2")
	await set_key("key2", "value")

This will break:

async def foo():
	await get_key("key2")
	await bar()
	await set_key("key2", "value")

Might be related that bar uses HTTPX