falcon: bug: sse-stream not properly consumed

I recently implemented an async route, where I “awaited” some result, but had to send a “keep-alive”-message in case the result takes longer than a certain time (1m in my case).

The exact same code works when setting the generator as value for response.stream, but will get stuck if instead setting it as response.sse (will work for small data, though).

This is my code:

# works:
async def wait_for_result_and_yield_keepalives():
    nonlocal awaitable
    while True:
        done, pending = await asyncio.wait((awaitable,), timeout=30)
        if not done:
            logger.info('sending keep-alive')
            awaitable = pending.pop()
            yield b' '
            continue
        break
    result = done.pop().result()
    logger.info(f'scan done {result=}')
    yield json.dumps(result).encode('utf-8')

resp.stream = wait_for_result_and_yield_keepalives()

# does not work (will get stuck):
async def wait_for_result_and_yield_keepalives():
    nonlocal awaitable
    while True:
        done, pending = await asyncio.wait((awaitable,), timeout=30)
        if not done:
            logger.info('sending keep-alive')
            awaitable = pending.pop()
            yield falcon.asgi.SSEvent(event='keep-alive')
            continue
        break
    result = done.pop().result()
    logger.info(f'scan done {result=}')
    yield falcon.asgi.SSEvent(event='result', json=result)

resp.sse = wait_for_result_and_yield_keepalives()

To be fair, I am so far not very familiar w/ “async”, so maybe my code is conceptionally flawed (feedback welcome…). However, I still would expect that behaviour between using response.stream and response.sse ought to be consistent.

Maybe there are more elegant ways of sending those “keep-alives”? I have to send those, because in my (kubernetes) environment, stale HTTP connections are closed (I think by Load-Balancer) after about 1m of inactivity (just in case you wonder why I want to do such a thing in the first place).

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 26 (18 by maintainers)

Most upvoted comments

Looking at the relevant piece of falcon.asgi.app’s source code, I think I see a potential cause though.

resp.sse has a simple disconnect watcher, and while it is handy to have, there are at least two unwanted side effects for your scenario:

* The watcher is awaiting ASGI `receive()` continuously, which will effectively drain your request stream, robbing you of some input data.

* The watcher stops iterating upon the client's premature disconnect and leaves behind your SSE event generator, which may or may not make it hang given the pattern you are using.

I was reading the asgi spec https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event

Disconnect - receive event Sent to the application when a HTTP connection is closed or if receive is called after a response has been sent. This is mainly useful for long-polling, where you may want to trigger cleanup code if the connection closes early.

Am I correct in reading that receive can only yield disconnect event after a body has been sent? If a partial body (`more_body=True) is valid it seem that this case could be avoided if the watched is moved after the sent of the first sse event.

But I have no clue in this case where any remaining incoming request body would end up. Discarded by the server?

asamof, I could also share the full code - it is just ~170 lines of code and has just one dependency - a running instance of clamd

In a pinch, that’s doable too 👍

I need to hit the hay now though, it’s getting late here (err, early). Hopefully I’ll have more time to take a look at this on Friday.

oh dear. So basically I just need to wrap this into a Task? will check

I think wait does that for you