aio-pika: How to exit QueryIterator?
I know it is not the first time it is posted here but still, I could not find a working solution. My problem is that my application runs in CLI mode and doesn’t react on ^C property. Here is an example:
import asyncio
import aio_pika
async def main():
async def get_connection():
return await aio_pika.connect_robust('amqp://guest:guest@localhost')
pool = aio_pika.pool.Pool(get_connection)
async with pool.acquire() as connection:
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue('my_queue_name', durable=True)
async with queue.iterator() as stream:
async for message in stream:
print(message)
if __name__ == '__main__':
asyncio.run(main())
At this moment everything works well. Messages are received and printed. Then, when I send CTRL+C (SIGINT) to the process, my program does not close. The messages are no longer received but the program runs.
I have to press ^C again and only then the process gets killed with this exception trace:
Click to see the trace
^C^CTraceback (most recent call last):
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
self.run_forever()
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
self._run_once()
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 1735, in _run_once
event_list = self._selector.select(timeout)
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "amqp_test.py", line 24, in <module>
asyncio.run(main())
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 46, in run
_cancel_all_tasks(loop)
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/runners.py", line 62, in _cancel_all_tasks
tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 566, in run_until_complete
self.run_forever()
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 534, in run_forever
self._run_once()
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 1735, in _run_once
event_list = self._selector.select(timeout)
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Exception ignored in: <coroutine object main at 0x7f4b216f9680>
Traceback (most recent call last):
File "amqp_test.py", line 18, in main
File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/connection.py", line 212, in __aexit__
File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 190, in close
File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/connection.py", line 33, in close
File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py", line 149, in close
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 398, in create_task
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed
RuntimeError: Event loop is closed
sys:1: RuntimeWarning: coroutine 'Base.__closer' was never awaited
sys:1: RuntimeWarning: coroutine 'QueueIterator.close' was never awaited
Task was destroyed but it is pending!
task: <Task pending coro=<QueueIterator.close() running at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aio_pika/queue.py:390> wait_for=<Task pending coro=<Channel.rpc() running at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py:137> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4b20b49790>()]> cb=[FutureStore.__on_task_done.<locals>.remover() at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py:51, <1 more>, <TaskWakeupMethWrapper object at 0x7f4b20225d10>()]> cb=[shield.<locals>._inner_done_callback() at /home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/tasks.py:803]>
Task was destroyed but it is pending!
task: <Task pending coro=<Channel.rpc() running at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py:137> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f4b20b49790>()]> cb=[FutureStore.__on_task_done.<locals>.remover() at /home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/base.py:51, <1 more>, <TaskWakeupMethWrapper object at 0x7f4b20225d10>()]>
Exception ignored in: <coroutine object Channel.rpc at 0x7f4b20ae0ef0>
Traceback (most recent call last):
File "/home/alex/projects/chicago_poc/.venv/lib/python3.7/site-packages/aiormq/channel.py", line 137, in rpc
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/queues.py", line 161, in get
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 683, in call_soon
File "/home/alex/.pyenv/versions/3.7.5/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed
RuntimeError: Event loop is closed
Expected behavior:
The program should exit on the first ^C.
About this issue
- Original URL
- State: open
- Created 4 years ago
- Reactions: 12
- Comments: 22 (4 by maintainers)
For me the complexity of the solutions required to cleanly exit from QueueIterator nullify its benefits. I find a pattern utilizing an exit event and callback for the message much cleaner.
QueryIterator.__anext__
will never throw aStopAsyncIteration
, does that mean intended usage is to always provide a timeout toqueue.iterator()
?Interestingly I’ve hit a similar problem. If I forcibly cancel
ayncio.all_tasks()
after (say) 5 seconds, I see this error reported byaio_pika
:Shoving a bunch of breakpoints all over the place, it looks (and I could be very wrong) that it’s the
QueueIterator
’s__aexit__()
which calls a cancel - with no timeout. (It hangs indefinitely waiting for RabbitMQ to acknowledge the shutdown - I guess?Am I barking up the wrong tree?
aio-pika
is a high level wrapper aroundaiormq
and contain helpers for autoreconnects and some useful tricks. You can use aiormq as well but have to handle reconnects etc.I still don’t know how to properly exit
QueryIterator
, but at least I fixed the “doubleCtrl+C
” issue by usingloop.run_until_complete
instead ofasyncio.run
… even without passing theloop
toconnect
orconnect_robust
. And the reason is the cancellation of tasks:https://github.com/python/cpython/blob/v3.8.5/Lib/asyncio/runners.py#L46
There is one aio-pika task that is getting stuck and does not react well to cancellation. After creating a custom signal handler that
cancel()
s the remaining tasks one by one onSIGINT
, I suspect the problem lies inaiormq.Connection.__heartbeat_task
.