aio-pika: How to exit QueryIterator?

I know it is not the first time it is posted here but still, I could not find a working solution. My problem is that my application runs in CLI mode and doesn’t react on ^C property. Here is an example:

import asyncio

import aio_pika


async def main():
    async def get_connection():
        return await aio_pika.connect_robust('amqp://guest:guest@localhost')

    pool = aio_pika.pool.Pool(get_connection)
    async with pool.acquire() as connection:
        async with connection:
            channel = await connection.channel()
            queue = await channel.declare_queue('my_queue_name', durable=True)
            async with queue.iterator() as stream:
                async for message in stream:
                    print(message)


if __name__ == '__main__':
    asyncio.run(main())

At this moment everything works well. Messages are received and printed. Then, when I send CTRL+C (SIGINT) to the process, my program does not close. The messages are no longer received but the program runs.

I have to press ^C again and only then the process gets killed with this exception trace:

Click to see the trace
^C^CTraceback (most recent call last):
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
    self.run_forever()
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
    self._run_once()
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 1735, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "amqp_test.py", line 24, in <module>
    asyncio.run(main())
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 46, in run
    _cancel_all_tasks(loop)
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 62, in _cancel_all_tasks
    tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
    self.run_forever()
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
    self._run_once()
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 1735, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Exception ignored in: <coroutine object main at 0x7f4b216f9680>
Traceback (most recent call last):
  File "amqp_test.py", line 18, in main
  File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/connection.py", line 212, in __aexit__
  File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 190, in close
  File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/connection.py", line 33, in close
  File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py", line 149, in close
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 398, in create_task
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed
RuntimeError: Event loop is closed
sys:1: RuntimeWarning: coroutine 'Base.__closer' was never awaited
sys:1: RuntimeWarning: coroutine 'QueueIterator.close' was never awaited
Task was destroyed but it is pending!
task: <Task pending coro=<QueueIterator.close() running at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/queue.py:390> wait_for=<Task pending coro=<Channel.rpc() running at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py:137> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4b20b49790>()]> cb=[FutureStore.__on_task_done.<locals>.remover() at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py:51, <1 more>, <TaskWakeupMethWrapper object at 0x7f4b20225d10>()]> cb=[shield.<locals>._inner_done_callback() at /home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/tasks.py:803]>
Task was destroyed but it is pending!
task: <Task pending coro=<Channel.rpc() running at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py:137> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4b20b49790>()]> cb=[FutureStore.__on_task_done.<locals>.remover() at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py:51, <1 more>, <TaskWakeupMethWrapper object at 0x7f4b20225d10>()]>
Exception ignored in: <coroutine object Channel.rpc at 0x7f4b20ae0ef0>
Traceback (most recent call last):
  File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py", line 137, in rpc
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/queues.py", line 161, in get
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 683, in call_soon
  File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed
RuntimeError: Event loop is closed

Expected behavior:

The program should exit on the first ^C.

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 12
  • Comments: 22 (4 by maintainers)

Most upvoted comments

For me the complexity of the solutions required to cleanly exit from QueueIterator nullify its benefits. I find a pattern utilizing an exit event and callback for the message much cleaner.


import signal
import asyncio
import functools
import aio_pika

async def on_message(msg):                                                                          
    print(msg)

async def main():

    kill_event = asyncio.Event()                                                                    
                                                                                                    
    async def shutdown():                                                            
        kill_event.set()                                                                            
                                                                                                    
    asyncio.get_running_loop().add_signal_handler(                                                  
        signal.SIGINT, functools.partial(asyncio.create_task, shutdown())                           
    )

    connection = aio_pika.connect_robust('amqp://guest:guest@localhost')
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue('my_queue')
        await queue.consume(on_message)
        await kill_event.wait()
    print('exiting') 

asyncio.run(main())

QueryIterator.__anext__ will never throw a StopAsyncIteration, does that mean intended usage is to always provide a timeout to queue.iterator()?

I still don’t know how to properly exit QueryIterator, but at least I fixed the “double Ctrl+C” issue by using loop.run_until_complete instead of asyncio.run… even without passing the loop to connect or connect_robust. And the reason is the cancellation of tasks:

https://github.com/python/cpython/blob/v3.8.5/Lib/asyncio/runners.py#L46

There is one aio-pika task that is getting stuck and does not react well to cancellation. After creating a custom signal handler that cancel()s the remaining tasks one by one on SIGINT, I suspect the problem lies in aiormq.Connection.__heartbeat_task.

Interestingly I’ve hit a similar problem. If I forcibly cancel ayncio.all_tasks() after (say) 5 seconds, I see this error reported by aio_pika:

Closing channel <Channel: "1"> because RPC call <pamqp.specification.Basic.Cancel object at 0x7f917a979070> cancelled

Shoving a bunch of breakpoints all over the place, it looks (and I could be very wrong) that it’s the QueueIterator’s __aexit__() which calls a cancel - with no timeout. (It hangs indefinitely waiting for RabbitMQ to acknowledge the shutdown - I guess?

class QueueIterator:
    @shield
    async def close(self):
        if not self._consumer_tag:
            return

        await self._amqp_queue.cancel(self._consumer_tag)

Am I barking up the wrong tree?

aio-pika is a high level wrapper around aiormq and contain helpers for autoreconnects and some useful tricks. You can use aiormq as well but have to handle reconnects etc.

I still don’t know how to properly exit QueryIterator, but at least I fixed the “double Ctrl+C” issue by using loop.run_until_complete instead of asyncio.run… even without passing the loop to connect or connect_robust. And the reason is the cancellation of tasks:

https://github.com/python/cpython/blob/v3.8.5/Lib/asyncio/runners.py#L46

There is one aio-pika task that is getting stuck and does not react well to cancellation. After creating a custom signal handler that cancel()s the remaining tasks one by one on SIGINT, I suspect the problem lies in aiormq.Connection.__heartbeat_task.