aio-pika: In aio-pika>=5 reconnect completely broken (RobustConnection)

Try yourself.

  • run one simple infinity publisher and one simple consumer in two separate process.
  • restart/kill rabbitmq
  • PROFIT

Reconnect logic in RobustConnection since aio-pika >= 5 is completely broken. (In aio-pika 4 broken too, but not so dramatically).

import asyncio
import argparse
import signal
from typing import Dict, Any
import logging
import aio_pika
import itertools


logger = logging.getLogger(__name__)


def parse_args() -> Dict[str, Any]:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '-r', '--rabbit',
        help='Rabbit address, ex: host:port',
        default='amqp://rabbitmq',
    )

    commands = parser.add_subparsers(dest='subcmd')
    consumer = commands.add_parser('consumer')
    consumer.add_argument(
        '-q', '--queue',
    )

    publisher = commands.add_parser('publisher')
    publisher.add_argument(
        '-q', '--queue',
    )

    return vars(parser.parse_args())


async def publisher_task(conf: Dict) -> None:
    logger.info('Run publisher')

    queue_name = conf['queue']
    count = itertools.count()
    conn = await aio_pika.connect_robust(conf['rabbit'])
    async with conn:
        channel = await conn.channel(publisher_confirms=True)
        # queue = await channel.declare_queue(
        #     queue_name, passive=True
        # )

        while True:
            message = 'message: {}'.format(next(count))
            logger.info('Send message: %r', message)
            await channel.default_exchange.publish(
                aio_pika.Message(
                    body=message.encode()
                ),
                routing_key=queue_name,
            )
            # await asyncio.sleep(0.1)


async def consumer_task(conf: Dict) -> None:
    logger.info('Run consumer')

    queue_name = conf['queue']
    conn = await aio_pika.connect_robust(conf['rabbit'])

    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)

        queue = await channel.declare_queue(
            queue_name, passive=True
        )

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                if aio_pika.version_info > (5,):
                    async with message.process():
                        logger.info('Message - %r', message.body)
                else:
                    with message.process():
                        logger.info('Message - %r', message.body)


async def run(conf: Dict) -> None:
    subcmd = conf['subcmd']
    if subcmd == 'publisher':
        await publisher_task(conf)
    elif subcmd == 'consumer':
        await consumer_task(conf)
    else:
        raise ValueError(f'unknown subcommand {subcmd!r}')


def main():
    args = parse_args()

    logging.basicConfig(
        level=logging.DEBUG,
        format=(
            '[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)1.1s] '
            '[%(name)s]:\t%(message)s'
        ),
        datefmt='%Y.%m.%d %H:%M:%S'
    )

    logger.debug('parsed args - %r', args)

    if not args.get('subcmd'):
        raise ValueError('subcommand not specified')

    loop = asyncio.get_event_loop()
    loop.set_debug(False)
    run_task = loop.create_task(run(args))

    def stop(_):
        nonlocal loop
        loop.stop()

    run_task.add_done_callback(stop)

    loop.add_signal_handler(signal.SIGTERM, run_task.cancel)
    loop.add_signal_handler(signal.SIGINT, run_task.cancel)
    try:
        loop.run_forever()
        if not run_task.cancelled():
            run_task.result()
    finally:
        loop.close()


if __name__ == '__main__':
    main()

Start rabbitmq docker

docker run -d -h $(hostname) --name rabbitmq -p 15679:15672 rabbitmq:3.7-management

Create test_queue in management rabbitmq UI

Start consumer in docker

docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py consumer -q test_queue

Start publisher in docker

docker run -ti --rm -v "$(pwd)/dev:/test" --link rabbitmq python:3.6 /bin/bash
pip install aio-pika
python /test/rabbit_test.py publisher -q test_queue

Restart rabbitmq container

docker restart rabbitmq

As result exited consumer and publisher processes

Consumer log

[2019.06.20 08:07:51.916] [13] [I] [__main__]:	Message - b'message: 5234'
[2019.06.20 08:07:51.920] [13] [I] [__main__]:	Message - b'message: 5235'
[2019.06.20 08:07:51.922] [13] [I] [__main__]:	Message - b'message: 5236'
[2019.06.20 08:07:51.926] [13] [I] [__main__]:	Message - b'message: 5237'
[2019.06.20 08:07:51.928] [13] [D] [aiormq.connection]:	Reader task exited because:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
    return await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
    raise exc
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.930] [13] [D] [aio_pika.connection]:	Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/rabbit_test.py", line 131, in <module>
    main()
  File "/test/rabbit_test.py", line 125, in main
    run_task.result()
  File "/test/rabbit_test.py", line 88, in run
    await consumer_task(conf)
  File "/test/rabbit_test.py", line 80, in consumer_task
    logger.info('Message - %r', message.body)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 400, in __aexit__
    await self.close()
  File "/usr/local/lib/python3.6/site-packages/aio_pika/tools.py", line 59, in awaiter
    return await future
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 357, in close
    await self._amqp_queue.cancel(self._consumer_tag)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_queue.py", line 110, in cancel
    result = await super().cancel(consumer_tag, timeout, nowait)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/queue.py", line 232, in cancel
    timeout=timeout, loop=self.loop
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 308, in basic_cancel
    nowait=nowait,
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 171, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/test/rabbit_test.py", line 77, in consumer_task
    logger.info('Message - %r', message.body)
  File "/usr/local/lib/python3.6/site-packages/aio_pika/message.py", line 630, in __aexit__
    await self.message.ack()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/tools.py", line 67, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 125, in drain
    return await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 329, in drain
    raise exc
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.981] [13] [E] [asyncio]:	Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7fafdd184ac8>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@b34f3c105852:/#

Publisher log

[2019.06.20 08:07:51.925] [12] [I] [__main__]:	Send message: 'message: 5238'
[2019.06.20 08:07:51.926] [12] [D] [aio_pika.exchange]:	Publishing message with routing key 'test_queue' via exchange <Exchange(): auto_delete=None, durable=None, arguments={})>: Message:{'app_id': None,
 'body_size': 13,
 'content_encoding': None,
 'content_type': None,
 'correlation_id': None,
 'delivery_mode': 1,
 'expiration': None,
 'headers': {},
 'message_id': None,
 'priority': 0,
 'reply_to': None,
 'timestamp': None,
 'type': 'None',
 'user_id': None}
[2019.06.20 08:07:51.929] [12] [D] [aiormq.connection]:	Reader task exited because:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.932] [12] [D] [aio_pika.connection]:	Closing AMQP connection <Connection: "amqp://rabbitmq">
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/rabbit_test.py", line 131, in <module>
    main()
  File "/test/rabbit_test.py", line 125, in main
    run_task.result()
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.6/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/test/rabbit_test.py", line 86, in run
    await publisher_task(conf)
  File "/test/rabbit_test.py", line 54, in publisher_task
    routing_key=queue_name,
  File "/usr/local/lib/python3.6/site-packages/aio_pika/exchange.py", line 202, in publish
    loop=self.loop, timeout=timeout
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/usr/local/lib/python3.6/site-packages/aiormq/channel.py", line 438, in basic_publish
    return await confirmation
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 350, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/usr/local/lib/python3.6/site-packages/aiormq/connection.py", line 301, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.6/asyncio/selector_events.py", line 752, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
[2019.06.20 08:07:51.968] [12] [E] [asyncio]:	Task was destroyed but it is pending!
task: <Task pending coro=<Connection.__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:168> wait_for=<Task finished coro=<__reader() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/connection.py:346> result=None> cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.6/site-packages/aiormq/base.py:51, <TaskWakeupMethWrapper object at 0x7f197a45eb88>()]>
[2019.06.20 08:07:51.968] [12] [E] [asyncio]:	Task was destroyed but it is pending!
task: <Task pending coro=<reject_all() done, defined at /usr/local/lib/python3.6/site-packages/aiormq/base.py:65> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f197a45ef48>()]> cb=[shield.<locals>._done_callback() at /usr/local/lib/python3.6/asyncio/tasks.py:688]>
root@1f4b2385667f:/#

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Comments: 18 (8 by maintainers)

Most upvoted comments

Duplicate of #202.

There is a slight posibility that what comes next is only true in combination with #615, since I repoduced your issue on that branch.

You should store the result of master.create_worker() since WeakSets are involed. The Worker instance has a strong reference to the RobustQueue which will keep the WeakRef the RobustConnection holds alive.

With the changes in this snippet it started working.

    client = await master.create_worker("my_task_name", worker, auto_delete=False, durable=True)

    try:
        await asyncio.Future()
    finally:
        await client.close()
        await connection.close()

edit: see https://github.com/mosquito/aio-pika/issues/594