httpcore: anyio.BrokenResourceError exception not caught ?

Hello,

I spotted several cases where an anyio exceptions made my program crash.

The error that is mentionned every time is anyio.BrokenResourceError but the traceback may change.

I collected the following ones:

Future exception was never retrieved
future: <Future finished exception=BrokenResourceError()>
Traceback (most recent call last):
  File "/usr/lib64/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

anyio.BrokenResourceError
Future exception was never retrieved
future: <Future finished exception=BrokenResourceError()>
Traceback (most recent call last):
  File "/usr/lib64/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
TimeoutError: [Errno 110] Connection timed out

The above exception was the direct cause of the following exception:

anyio.BrokenResourceError
Future exception was never retrieved
future: <Future finished exception=BrokenResourceError()>
Traceback (most recent call last):
  File "/usr/lib64/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/devloop/.local/share/virtualenvs/wapiti-p7I6n6KS/lib/python3.8/site-packages/httpcore/_backends/anyio.py", line 60, in read
    return await self.stream.receive(n)
  File "/home/devloop/.local/share/virtualenvs/wapiti-p7I6n6KS/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1093, in receive
    raise self._protocol.exception
anyio.BrokenResourceError

What I was expecting: an httpx exception instead (RequestError).

Used libraries and versions:

aiocache==0.11.1
anyio==3.2.1
async-timeout==3.0.1
httpcore==0.13.6
httpx==0.18.2

Python 3.8.10

OS: Linux (openSUSE Tumbleweed)

I managed to reproduce with the following script. Unfortunately it doesn’t happen for every website.

import asyncio
import signal
import sys

import httpx

MAX_TASKS = 30

stop_event = asyncio.Event()


def stop_attack_process():
    global stop_event
    print("Stopping tasks")
    stop_event.set()


class Buster:
    def __init__(self, root_url: str, payloads_file: str, event: asyncio.Event):
        self._client = httpx.AsyncClient(timeout=10)
        self._root_url = root_url
        self._payloads_file = payloads_file
        self._stop_event = event
        self.network_errors = 0

    async def close(self):
        await self._client.aclose()

    async def check_url(self, url):
        response = await self._client.get(url)
        if response.status_code == 200:
            return True, url
        return False, url

    async def brute(self):
        tasks = set()
        pending_count = 0
        payload_iterator = open(self._payloads_file, errors="ignore")

        while True:
            if pending_count < MAX_TASKS and not self._stop_event.is_set():
                try:
                    candidate = next(payload_iterator)
                except StopIteration:
                    pass
                else:
                    candidate = candidate.strip()
                    if not candidate:
                        continue

                    url = self._root_url + candidate
                    task = asyncio.create_task(self.check_url(url))
                    tasks.add(task)

            if not tasks:
                break

            done_tasks, pending_tasks = await asyncio.wait(
                tasks,
                timeout=0.01,
                return_when=asyncio.FIRST_COMPLETED
            )
            pending_count = len(pending_tasks)
            for task in done_tasks:
                try:
                    result, url = await task
                except httpx.RequestError:
                    self.network_errors += 1
                else:
                    if result:
                        print(f"Found {url}")
                tasks.remove(task)

            if self._stop_event.is_set():
                print("pending tasks:", pending_count)
                for task in pending_tasks:
                    task.cancel()
                    tasks.remove(task)


async def main(root_url: str):
    global stop_event
    filename = "busterPayloads.txt"
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, stop_attack_process)
    buster = Buster(root_url, filename, stop_event)
    await buster.brute()
    loop.remove_signal_handler(signal.SIGINT)
    await buster.close()

if __name__ == "__main__":
    asyncio.run(main(sys.argv[1]))

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 43 (22 by maintainers)

Most upvoted comments

Hola,

Just a thought: seems like an exception we ought to be watching in the anyio backend in HTTPCore. Resolution would involve adding an extra catch case there, I believe. 😃

I can repro the error with your sample script. I’ll try to figure out what is going on here.

Had it running for a week and have not seen the exception since then. I think this is now fixed and we can close it. Thanks again for the help!

Ok, I finally figured out what’s happening here. The Future that never got its exception retrieved is StreamProtocol.write_future from AnyIO’s asyncio backend.

I managed to factor out httpx from your sample script, simplifying it into this:

import socket
import threading
import sys

import anyio

HOST = "localhost"
PORT = 8833


ssck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssck.bind((HOST, PORT))
ssck.listen(1)


def server():
    sck, addr = ssck.accept()
    del sck  # required to trigger the problem!


thread = threading.Thread(target=server, daemon=True)
thread.start()


async def hello():
    stream = await anyio.connect_tcp(HOST, PORT)
    await stream.send(b'GET')
    try:
        await stream.receive()
    except Exception as e:
        print(e)

    print("finished")


anyio.run(hello)

What happens here is the combination of the following:

  1. The client writes into the socket (skipping will cause the error not to be triggered)
  2. The server aborts the connection (just closing the accepted socket will cause the error not to be triggered)
  3. The StreamProtocol.connection_lost() callback gets called with ConnectionResetError
  4. The callback resolves its write_future using BrokenResourceError, with that ConnectionResetError as the cause
  5. The client tries to read from the socket, causing a new BrokenResourceError to be raised
  6. Since nobody tries to write to the stream any more, write_future never passes on the first BrokenResourceError, hence the warning

While this causes an annoying warning to be printed, it doesn’t really break anything. Nevertheless, this is an AnyIO bug that should be fixed. Another oddity is that, when writing a test based on this script, I got EndOfStream from the asyncio backend and BrokenResourceError from trio. If I place await sleep(0.1) before the send(), all backends raise EndOfStream. Sigh 😩

OK, AnyIO v3.3.1 is out now.

It was actually a really simple problem caused by PyPy’s different GC semantics.

It seems that the new test hangs on PyPy3 which is strange. I will sort this out tomorrow.

Once this PR goes in (I will wait for a capable reviewer first), it should eliminate the problem.

I thought the exception was the one to eliminate

The BrokenResourceError that we are catching with pytest.raises? No, that should be raised in this situation. We wanted to eliminate the other BrokenResourceError, the one in the write_future. That’s why we need to trigger the garbage collection and look at the captured logs.

(If we fix the underlining issue, the test should be passing for all of them, not just some of them?)

Indeed it should, that is how regression tests work 😃

Thanks, I’ll work on this later today.

I wrote this test (in test_sockets.py, under TestTCPStream):

    async def test_server_crash_after_write(self, family: AnyIPAddressFamily) -> None:
        """
        Tests that there won't be any leftover Futures that don't get their exceptions retrieved.

        See https://github.com/encode/httpcore/issues/382 for details.

        """
        def serve() -> None:
            sock, addr = server_sock.accept()
            sock.close()
            del sock

        server_sock = socket.socket(family, socket.SOCK_STREAM)
        server_sock.settimeout(1)
        server_sock.bind(('localhost', 0))
        server_sock.listen()
        thread = Thread(target=serve, daemon=True)
        thread.start()
        server_addr = server_sock.getsockname()[:2]
        async with await connect_tcp(*server_addr) as stream:
            await sleep(0.1)
            await stream.send(b'hello')
            with pytest.raises(BrokenResourceError):
                await stream.receive()

        thread.join()

Ok, it seems that on trio, httpcore doesn’t use its anyio backend but the trio backend instead (which obviously never raises any AnyIO exceptions). This little fact had escaped my attention and explains why it didn’t trip my debug lines when running on trio.

Both backends raise httpcore.ReadError when trying to read from the socket which is correct. I’m now trying to find out if there is some leftover task that somehow leaked the BrokenResourceError on asyncio.

The odd thing here is that with the trio backend, SocketStream.read() never even gets called. The end result would surely be a BrokenResourceError on trio too if that method was ever called. Next I will figure out the cause for this discrepancy.

Thanks for taking a look! Yeah, the exception was handled when the request was executed (the print(e) statement was hit during my test), but asyncio seem to disagree. (The error message was printed after “Finished” and everything is GCed).

This happens when an exception leaks out of an asynchronous callback (being a task or a low level callback). The exception is raised in an another task so you cannot possibly catch it with your own except clause.

I’m taking another look at this bug today and manage to construct a minimal script that repro the issue. It reliably repros the issue on my end.

import socket
import threading

import anyio
import httpx

HOST = "localhost"
PORT = 8833


ssck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssck.bind((HOST, PORT))
ssck.listen(1)


def server():
    while True:
        sck, addr = ssck.accept()
        raise Exception("server crashed")


thread = threading.Thread(target=server)
thread.start()
client = httpx.AsyncClient()


async def hello():
    try:
        await client.get(f"http://{HOST}:{PORT}")
    except Exception as e:
        print(e)
    await client.aclose()
    print("finished")


anyio.run(hello)

The next step is to look into httpcore’s code to see how to patch it. I would love some help on this if the team has time to take a look.