httpcore: Request errors on overloaded systems may poison the connection pool

It is somewhat related to #502 as we started first observing the below issue in httpcore 0.14.5. Upgrading to 0.14.7 seemingly fixed the issue but we found out that there is still some underlying race condition that can poison the pool, making it unable to serve requests ever again. The issue came up using httpx==0.22.0 httpcore==0.14.7, it is not present using httpx==0.16.1 httpcore==0.12.3

Here is a script brute forcing the client and reliably triggering the issue on my machine. (The script is written for the sync version, but we are having this issue in production with the async version as well).

Repro script

PARALLELISM and REQUESTS may need adjustments to reproduce the issue.

import multiprocessing.pool
import socket
import httpx

PARALLELISM = 20
REQUESTS = 200

unavailable_listener = socket.socket()
unavailable_listener.bind(("localhost", 0))
unavailable_listener.listen()

unavailable_url = "http://{}:{}".format(*unavailable_listener.getsockname())

client = httpx.Client(limits=httpx.Limits(max_connections=1, max_keepalive_connections=0))

pool = multiprocessing.pool.ThreadPool(PARALLELISM)

# client.get(...) works well at this point

# Raises httpcore.PoolTimeout as expected because the pool has been exhausted
pool.map(lambda _: client.get(unavailable_url), range(REQUESTS))

# client.get(...) will still raise PoolTimeout even though there supposed to be no outstanding connections

After running the above code, the pool is no longer operational, containing a stuck connection:

>>> client._transport._pool._pool
[<HTTPConnection [CONNECTING]>]
>>> client._transport._pool._pool[0]._connect_failed
False
>>> client._transport._pool._pool[0]._connection
>>> 
>>> client._transport._pool._pool[0].is_avaliable()
>>> False

My hunch is that RequestStatus.set_connection() will succeed from _attempt_to_acquire_connection but status.wait_for_connection() can still time-out on an overloaded system leaving the half-created connection there. Subsequent request attempts won’t be able to use this connection instance as it isn’t available, and it won’t be cleaned up as it is not closed nor failed.

The interesting about this situation is that a just crafted connection can be put in the pool without guaranteeing that it will be used during the current requests.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 12
  • Comments: 18 (3 by maintainers)

Commits related to this issue

Most upvoted comments

I experimented with versions 0.16.3, 0.17.1 and 0.18.0.

Having some long-responding server on localhost:5522 the following code fails with PoolTImeout. You can get and run such a long-responding server from this comment: https://github.com/encode/httpx/issues/1171#issuecomment-1850923234

I expect that the following code will not raise PoolTimeout, especially when I canceled all the tasks and waited for an additional 5 seconds for cancelation to complete.

import asyncio
import httpcore


async def main() -> None:
    async with httpcore.AsyncConnectionPool(max_connections=3) as pool:
        async def do_one_request():
            return await pool.request("GET", "http://localhost:5522/", extensions={"timeout": {"pool": 1}})

        # First, create many requests, then cancel while they are in progress.
        tasks = []
        for i in range(5):
            tasks.append(asyncio.create_task(do_one_request()))
            await asyncio.sleep(0.0001)
            tasks[-1].cancel()

        print("Wait reasonable amount of time")
        await asyncio.sleep(5)
        print("Starting another request will now fail with a `PoolTimeout`")
        await do_one_request()


asyncio.run(main())

My case is probably related to the issue discussed here.

Facing this or very similar behaviour under some load in production microservice after upgrading httpx to (httpx ==0.23.0) and python to 3.10. After some time and several ConnectTimeout/ReadTimeout exactly all requests are finished with PoolTimeout. Downgrade to httpx ==0.18.1 solved the issue.

Could reproduce issue with pure httpcore (adapted @vlaci script, `httpcore==0.16.3`)
import asyncio
import traceback
from collections import Counter
from socket import socket

import httpcore

extensions = {
    "timeout": {
        "connect": .2,
        "read": .2,
        "pool": .2,
        "write": .2,
    },
    # "trace": log,
}


def report(pool):
    print("-- Report:", f"--- {pool.connections=}", f"--- {pool._requests=}", sep="\n")
    if pool.connections:
        conn = pool.connections[0]
        print(f"--- {conn=}")
        print(f"--- Connection has expired? - {conn.has_expired()}")
        print(f"--- Inner connection is {conn._connection}")


async def main():
    _socket = socket()
    _socket.bind(("localhost", 0))
    _socket.listen()

    async with httpcore.AsyncConnectionPool(
            max_connections=1,
            max_keepalive_connections=0,
    ) as pool:
        # Control
        await pool.request("GET", "https://en.wikipedia.org/wiki/2023")
        print("- Control is OK")
        report(pool)

        # Saturate pool
        num_tasks = 600
        url = "http://{}:{}".format(*_socket.getsockname())
        results = await asyncio.gather(
            *[
                asyncio.create_task(pool.request(
                    "GET",
                    url,
                    extensions=extensions,
                ))
                for _ in range(num_tasks)
            ],
            return_exceptions=True,
        )
        print(f"- Finished saturation\n-- ({num_tasks=}): {dict(Counter(type(res) for res in results))}")
        report(pool)

        # Control
        try:
            await pool.request("GET", "https://en.wikipedia.org/wiki/2023", extensions=extensions)  # PoolTimeout
            # await pool.request("GET", url, extensions=extensions)  Expected ReadTimeout, got PoolTimeout
        except httpcore.PoolTimeout:
            print("- ERROR: Got pool timeout")
            traceback.print_exc(chain=True)
        else:
            print("- No pool timeout!")
Some observations Every task is trying to acquire connection (`AsyncConnectionPool._attempt_to_acquire_connection` in `AsyncConnectionPool.handle_async_request`).

First one (lets name it t1) succeeded and passes through status.wait_for_connection in AsyncConnectionPool.handle_async_request while other tasks hang here.

Then t1 can create a real connection in connection.handle_async_request(request) but fails with ReadTimeout.

This leads to call self.response_closed(status) and real connection is closed and connection wrapper is removed from the pool, request status is removed from separate register either. After this new request status creates another connection wrapper with empty connection. Looks like under normal conditions acquiring of new connection wrapper triggers status.wait_for_connection in some task and then new real connection can be established so on and so forth. But under some load asyncio event loop is not fast enough to change coroutines and happens PoolTimeout. So no one can establish real connection (because all tasks has been finished with PoolTimeout and removed) and there is connection wrapper inside pool without real connection.

Now two things can happen.

First one is pool receives new request to some different url. It tries to handle it but hangs until PoolTimeout on status.wait_for_connection because pool can’t create new connection wrapper and old one neither can process request nor can be removed (connection wrapper has no real connection so it is not idle or expired, it is “CONNECTING”).

Second is pool receives new request to same url as it is in connection wrapper inside pool (“CONNECTING” one). In this case request can’t be processed because connection wrapper can handle request but wrapper is not available at the same time (failed is False and connection is None and httpx2 property is False either)

In short looks like there is case when under some load connection wrapper in pool can be in CONNECTING state and can't obtain actual connection. You can increase PoolTimeout so it is bigger than other timeouts. Even a slight excess greatly increases performance, but pool poisoning is still possible.

I see two approaches:

  • Introduce CONNECTING expiration to trigger is_expired cleanup
  • Kill one CONNECTING connection if no request statuses needs it

I tried second approach and it worked well - no pool timeouts and “control” requests are successfull:

async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
#...
# Attempt to close CONNECTING connections that noone needs
        if self.is_full_pool:
            for idx, connection in enumerate(self._pool):  # Try to check old connections first
                if not connection.is_connecting():
                    continue
                for req_status in self._requests:
                    if req_status is status:  # skip current request
                        continue
                    cur_origin = req_status.request.url.origin
                    if connection.can_handle_request(cur_origin):
                        break
                else:  # There is no requests that can be handled by this connection
                    await connection.aclose()
                    self._pool.pop(idx)
                    # print(f"XXX {id(status)} Zombie killed!")
                    break

Can confirm that I’ve also reproduced the issue.

I used adapted @nihilSup's example slightly, using `trio`...
import trio
import traceback
from collections import Counter
from socket import socket

import httpcore

extensions = {
    "timeout": {
        "connect": .2,
        "read": .2,
        "pool": .2,
        "write": .2,
    },
    # "trace": log,
}


def report(pool):
    print("-- Report:", f"--- {pool.connections=}", f"--- {pool._requests=}", sep="\n")
    if pool.connections:
        conn = pool.connections[0]
        print(f"--- {conn=}")
        print(f"--- Connection has expired? - {conn.has_expired()}")
        print(f"--- Inner connection is {conn._connection}")


async def make_request(pool, url, extensions):
    try:
        r = await pool.request(
            "GET",
            url,
            extensions=extensions,
        )
    except httpcore.TimeoutException as exc:
        print(type(exc))
    else:
        print(r)


async def main():
    _socket = socket()
    _socket.bind(("localhost", 0))
    _socket.listen()

    async with httpcore.AsyncConnectionPool(
            max_connections=1,
            max_keepalive_connections=0,
    ) as pool:
        # Control
        await pool.request("GET", "https://en.wikipedia.org/wiki/2023")
        print("- Control is OK")
        report(pool)

        # Saturate pool
        num_tasks = 300
        url = "http://{}:{}".format(*_socket.getsockname())
        async with trio.open_nursery() as nursery:
            for _ in range(num_tasks):
                nursery.start_soon(make_request, pool, url, extensions)

        print(f"- Finished saturation\n")
        report(pool)

        # Control
        try:
            await pool.request("GET", "https://en.wikipedia.org/wiki/2023", extensions=extensions)  # PoolTimeout
        except httpcore.PoolTimeout:
            print("- ERROR: Got pool timeout")
            traceback.print_exc(chain=True)
        else:
            print("- No pool timeout!")


trio.run(main)

I was able to confirm the ReadError/PoolTimeout combination caused the error, and that it resulted in a state where we’re handling the PoolTimeout, but the connection has been assigned to the waiting request.

I was also able to confirm the suggested fix, although I’m not yet entirely comfortable with it. It seems a little brittle, and I’d prefer to understand first if we’re able to avoid this state.

I wonder if we’re able to reliably reproduce this state, using a mocked out network backend similar to some of the cases in tests/_async/test_connection_pool.py?

Not stale.

An observation after I played with your test code locally @nihilSup

We can reproduce this zombie situation without ReadTimeout, and even, without the hanging backend:

under tests/_async/test_connection_pool.py

@pytest.mark.trio
async def test_pool_timeout_connection_cleanup():
    network_backend = AsyncMockBackend(
        [
            b"HTTP/1.1 200 OK\r\n",
            b"Content-Type: plain/text\r\n",
            b"Content-Length: 13\r\n",
            b"\r\n",
            b"Hello, world!",
        ]
    )

    async with AsyncConnectionPool(
        network_backend=network_backend, max_connections=2
    ) as pool:
        with pytest.raises(PoolTimeout):
            extensions = {"timeout": {"pool": 0}}
            await pool.request("GET", "https://example.com/", extensions=extensions)

        # wait for a considerable amount of time to make sure all requests time out
        await concurrency.sleep(1)

        print(pool.connections) # [<AsyncHTTPConnection [CONNECTING]>]
        print(pool._requests) # [] bad! both should be empty

I think the reason why this achieves the same outcome is because as we set timeout to 0, status.wait_for_connection(timeout=0) will immediately raise PoolTimeout, but before that it was assigned a connection in https://github.com/encode/httpcore/blob/7eb20224833b8165d0945d08ac8f714ccb6750b9/httpcore/_async/connection_pool.py#L221 This could be a more minimal reproduction of the scenario. But the difference is that the connection assignment in real situation happened in another request by the ReadTimeout.

I wonder if we’re able to reliably reproduce this state, using a mocked out network backend similar to some of the cases in tests/_async/test_connection_pool.py?

@tomchristie On weekend I prepared draft PR with tests both for sync and async implementations.

httpcore/backends/mock.py
class HangingStream(MockStream):
    def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
        if self._closed:
            raise ReadError("Connection closed")
        time.sleep(timeout or 0.1)
        raise ReadTimeout


class MockBackend(NetworkBackend):
    def __init__(
            self,
            buffer: typing.List[bytes],
            http2: bool = False,
            resp_stream_cls: Optional[Type[NetworkStream]] = None,
    ) -> None:
        self._buffer = buffer
        self._http2 = http2
        self._resp_stream_cls: Type[MockStream] = resp_stream_cls or MockStream

    def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: Optional[float] = None,
        local_address: Optional[str] = None,
    ) -> NetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    def connect_unix_socket(
        self, path: str, timeout: Optional[float] = None
    ) -> NetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    def sleep(self, seconds: float) -> None:
        pass


class AsyncHangingStream(AsyncMockStream):
    async def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
        if self._closed:
            raise ReadError("Connection closed")
        await anyio.sleep(timeout or 0.1)
        raise ReadTimeout


class AsyncMockBackend(AsyncNetworkBackend):
    def __init__(
            self,
            buffer: typing.List[bytes],
            http2: bool = False,
            resp_stream_cls: Optional[Type[AsyncNetworkStream]] = None,
    ) -> None:
        self._buffer = buffer
        self._http2 = http2
        self._resp_stream_cls: Type[AsyncMockStream] = resp_stream_cls or AsyncMockStream

    async def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: Optional[float] = None,
        local_address: Optional[str] = None,
    ) -> AsyncNetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    async def connect_unix_socket(
        self, path: str, timeout: Optional[float] = None
    ) -> AsyncNetworkStream:
        return self._resp_stream_cls(list(self._buffer), http2=self._http2)

    async def sleep(self, seconds: float) -> None:
        pass
tests/_async/test_connection_pool.py
@pytest.mark.trio
async def test_pool_under_load():
    """
    Pool must remain operational after some peak load.
    """
    network_backend = AsyncMockBackend([], resp_stream_cls=AsyncHangingStream)

    async def fetch(_pool: AsyncConnectionPool, *exceptions: Type[BaseException]):
        with contextlib.suppress(*exceptions):
            async with pool.stream(
                    "GET",
                    "http://a.com/",
                    extensions={
                        "timeout": {
                            "connect": 0.1,
                            "read": 0.1,
                            "pool": 0.1,
                            "write": 0.1,
                        },
                    },
            ) as response:
                await response.aread()

    async with AsyncConnectionPool(
        max_connections=1, network_backend=network_backend
    ) as pool:
        async with concurrency.open_nursery() as nursery:
            for _ in range(300):
                # Sending many requests to the same url. All of them but one will have PoolTimeout. One will
                # be finished with ReadTimeout
                nursery.start_soon(fetch, pool, PoolTimeout, ReadTimeout)
        if pool.connections:  # There is one connection in pool in "CONNECTING" state
            assert pool.connections[0].is_connecting()
        with pytest.raises(ReadTimeout):  # ReadTimeout indicates that connection could be retrieved
            await fetch(pool)
tests/_sync/test_connection_pool.py
def test_pool_under_load():
    """
    Pool must remain operational after some peak load.
    """
    network_backend = MockBackend([], resp_stream_cls=HangingStream)

    def fetch(_pool: ConnectionPool, *exceptions: Type[BaseException]):
        with contextlib.suppress(*exceptions):
            with pool.stream(
                    "GET",
                    "http://a.com/",
                    extensions={
                        "timeout": {
                            "connect": 0.1,
                            "read": 0.1,
                            "pool": 0.1,
                            "write": 0.1,
                        },
                    },
            ) as response:
                response.read()

    with ConnectionPool(
        max_connections=1, network_backend=network_backend
    ) as pool:
        with concurrency.open_nursery() as nursery:
            for _ in range(300):
                # Sending many requests to the same url. All of them but one will have PoolTimeout. One will
                # be finished with ReadTimeout
                nursery.start_soon(fetch, pool, PoolTimeout, ReadTimeout)
        if pool.connections:  # There is one connection in pool in "CONNECTING" state
            assert pool.connections[0].is_connecting()
        with pytest.raises(ReadTimeout):  # ReadTimeout indicates that connection could be retrieved
            fetch(pool)

assert pool.connections[0].is_connecting() can be skipped because it contains my helper method

@tomchristie @florimondmanca apologies for the ping, but can you take a look at the comments here? It seems like an important issue.