aio-pika: In aio-pika>=5 reconnect completely broken (RobustConnection)
Try yourself.
- run one simple infinity publisher and one simple consumer in two separate process.
- restart/kill rabbitmq
- …
- PROFIT
Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. (In aio-pika 4 broken too, but not so dramatically).
import asyncio
import argparse
import signal
from typing import Dict, Any
import logging
import aio_pika
import itertools
logger = logging.getLogger(__name__)
def parse_args() -> Dict[str, Any]:
parser = argparse.ArgumentParser()
parser.add_argument(
'-r', '--rabbit',
help='Rabbit address, ex: host:port',
default='amqp://rabbitmq',
)
commands = parser.add_subparsers(dest='subcmd')
consumer = commands.add_parser('consumer')
consumer.add_argument(
'-q', '--queue',
)
publisher = commands.add_parser('publisher')
publisher.add_argument(
'-q', '--queue',
)
return vars(parser.parse_args())
async def publisher_task(conf: Dict) -> None:
logger.info('Run publisher')
queue_name = conf['queue']
count = itertools.count()
conn = await aio_pika.connect_robust(conf['rabbit'])
async with conn:
channel = await conn.channel(publisher_confirms=True)
# queue = await channel.declare_queue(
# queue_name, passive=True
# )
while True:
message = 'message: {}'.format(next(count))
logger.info('Send message: %r', message)
await channel.default_exchange.publish(
aio_pika.Message(
body=message.encode()
),
routing_key=queue_name,
)
# await asyncio.sleep(0.1)
async def consumer_task(conf: Dict) -> None:
logger.info('Run consumer')
queue_name = conf['queue']
conn = await aio_pika.connect_robust(conf['rabbit'])
async with conn:
channel = await conn.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue(
queue_name, passive=True
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if aio_pika.version_info > (5,):
async with message.process():
logger.info('Message - %r', message.body)
else:
with message.process():
logger.info('Message - %r', message.body)
async def run(conf: Dict) -> None:
subcmd = conf['subcmd']
if subcmd == 'publisher':
await publisher_task(conf)
elif subcmd == 'consumer':
await consumer_task(conf)
else:
raise ValueError(f'unknown subcommand {subcmd!r}')
def main():
args = parse_args()
logging.basicConfig(
level=logging.DEBUG,
format=(
'[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)1.1s] '
'[%(name)s]:\t%(message)s'
),
datefmt='%Y.%m.%d %H:%M:%S'
)
logger.debug('parsed args - %r', args)
if not args.get('subcmd'):
raise ValueError('subcommand not specified')
loop = asyncio.get_event_loop()
loop.set_debug(False)
run_task = loop.create_task(run(args))
def stop(_):
nonlocal loop
loop.stop()
run_task.add_done_callback(stop)
loop.add_signal_handler(signal.SIGTERM, run_task.cancel)
loop.add_signal_handler(signal.SIGINT, run_task.cancel)
try:
loop.run_forever()
if not run_task.cancelled():
run_task.result()
finally:
loop.close()
if __name__ == '__main__':
main()
Start rabbitmq docker
docker run -d -h $(hostname) --name rabbitmq -p 15679:15672 rabbitmq:3.7-management
Create test_queue
in management rabbitmq UI
Start consumer in docker
docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py consumer -q test_queue
Start publisher in docker
docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py publisher -q test_queue
Restart rabbitmq container
docker restart rabbitmq
As result exited consumer and publisher processes
Consumer log
[2019.06.20 08:07:51.916] [13] [I] [__main__]: Message - b'message: 5234'
[2019.06.20 08:07:51.920] [13] [I] [__main__]: Message - b'message: 5235'
[2019.06.20 08:07:51.922] [13] [I] [__main__]: Message - b'message: 5236'
[2019.06.20 08:07:51.926] [13] [I] [__main__]: Message - b'message: 5237'
[2019.06.20 08:07:51.928] [13] [D] [aiormq.connection]: Reader task exited because:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
return (yield from self().__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
return await self.writer.drain()
File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
raise exc
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.930] [13] [D] [aio_pika.connection]: Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/test/rabbit_test.py", line 131, in <module>
main()
File "/test/rabbit_test.py", line 125, in main
run_task.result()
File "/test/rabbit_test.py", line 88, in run
await consumer_task(conf)
File "/test/rabbit_test.py", line 80, in consumer_task
logger.info('Message - %r', message.body)
File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 400, in __aexit__
await self.close()
File "/usr/local/lib/python3.6/site-packages/aio_pika/tools.py", line 59, in awaiter
return await future
File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 357, in close
await self._amqp_queue.cancel(self._consumer_tag)
File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_queue.py", line 110, in cancel
result = await super().cancel(consumer_tag, timeout, nowait)
File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 232, in cancel
timeout=timeout, loop=self.loop
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
return (yield from fut)
File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 308, in basic_cancel
nowait=nowait,
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 171, in wrap
return await self.create_task(func(self, *args, **kwargs))
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/test/rabbit_test.py", line 77, in consumer_task
logger.info('Message - %r', message.body)
File "/usr/local/lib/python3.6/site-packages/aio_pika/message.py", line 630, in __aexit__
await self.message.ack()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
return (yield from self().__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
return await self.writer.drain()
File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
raise exc
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.981] [13] [E] [asyncio]: Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7fafdd184ac8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@b34f3c105852:/#
Publisher log
[2019.06.20 08:07:51.925] [12] [I] [__main__]: Send message: 'message: 5238'
[2019.06.20 08:07:51.926] [12] [D] [aio_pika.exchange]: Publishing message with routing key 'test_queue' via exchange <Exchange(): auto_delete=None, durable=None, arguments={})>: Message:{'app_id': None,
'body_size': 13,
'content_encoding': None,
'content_type': None,
'correlation_id': None,
'delivery_mode': 1,
'expiration': None,
'headers': {},
'message_id': None,
'priority': 0,
'reply_to': None,
'timestamp': None,
'type': 'None',
'user_id': None}
[2019.06.20 08:07:51.929] [12] [D] [aiormq.connection]: Reader task exited because:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.932] [12] [D] [aio_pika.connection]: Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/test/rabbit_test.py", line 131, in <module>
main()
File "/test/rabbit_test.py", line 125, in main
run_task.result()
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
raise self.exception from e
File "/test/rabbit_test.py", line 86, in run
await publisher_task(conf)
File "/test/rabbit_test.py", line 54, in publisher_task
routing_key=queue_name,
File "/usr/local/lib/python3.6/site-packages/aio_pika/exchange.py", line 202, in publish
loop=self.loop, timeout=timeout
File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
return (yield from fut)
File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 438, in basic_publish
return await confirmation
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
weight, channel, frame = await self.__receive_frame()
File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.968] [12] [E] [asyncio]: Task was destroyed but it is pending!
task: <Task pending coro=<Connection.__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:168> wait_for=<Task finished coro=<__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/connection.py:346> result=None> cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.6/site-packages/aiormq/base.py:51, <TaskWakeupMethWrapper object at 0x7f197a45eb88>()]>
[2019.06.20 08:07:51.968] [12] [E] [asyncio]: Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f197a45ef48>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@1f4b2385667f:/#
About this issue
- Original URL
- State: open
- Created 5 years ago
- Comments: 18 (8 by maintainers)
Duplicate of #202.
There is a slight posibility that what comes next is only true in combination with #615, since I repoduced your issue on that branch.
You should store the result of
master.create_worker()
sinceWeakSet
s are involed. TheWorker
instance has a strong reference to theRobustQueue
which will keep theWeakRef
theRobustConnection
holds alive.With the changes in this snippet it started working.
edit: see https://github.com/mosquito/aio-pika/issues/594