aio-pika: Error flood by robust connection

I guess robust connection was broken. If we start simple consumer from example and restart RabbitMQ server than we will drown in errors returned by aio-pika.

Version 1.9.1 is OK, so the problem was introduced in 2.0.0a (when pika was upgraded to the version 0.11.0).

Example log:

RabbitMQ connection lost
Connection refused: 500 - (-1, 'EOF')
RabbitMQ connection lost
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Auto Reconnect Error',)>
ConnectionError: Auto Reconnect Error
Task exception was never retrieved
future: <Task finished coro=<RobustConnection.connect() done, defined at /home/decaz/workspace/aio-pika/aio_pika/robust_connection.py:101> exception=The AMQP connection was closed: ('Channel allocation requires an open connection: <AsyncioConnection CLOSED socket=None pa$
ams=<ConnectionParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>',)>
Traceback (most recent call last):
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_connection.py", line 110, in connect
    yield from channel.on_reconnect(self, number)
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_channel.py", line 64, in on_reconnect
    yield from self.initialize()
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_channel.py", line 74, in initialize
    result = yield from super().initialize()
  File "/home/decaz/workspace/aio-pika/aio_pika/channel.py", line 153, in initialize
    self._channel = yield from self._create_channel(timeout)
  File "/home/decaz/workspace/aio-pika/aio_pika/channel.py", line 132, in _create_channel
    channel_number=self._channel_number
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/connection.py", line 1153, in channel
    'Channel allocation requires an open connection: %s' % self)
pika.exceptions.ConnectionClosed: Channel allocation requires an open connection: <AsyncioConnection CLOSED socket=None params=<ConnectionParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Auto Reconnect Error',)>
ConnectionError: Auto Reconnect Error
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Auto Reconnect Error',)>
ConnectionError: Auto Reconnect Error
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Auto Reconnect Error',)>
ConnectionError: Auto Reconnect Error
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Auto Reconnect Error',)>
ConnectionError: Auto Reconnect Error
Exception in callback BaseConnection._handle_events(fd=6, events=1)()
handle: <Handle BaseConnection._handle_events(fd=6, events=1)()>
Traceback (most recent call last):
...
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 395, in _handle_events
    self._handle_read()
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 449, in _handle_read
    self._on_data_available(data)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/connection.py", line 1938, in _on_data_available
    self._process_frame(frame_value)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/connection.py", line 2059, in _process_frame
    if self._process_callbacks(frame_value):
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/connection.py", line 2040, in _process_callbacks
    frame_value)  # Args
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/callback.py", line 60, in wrapper
    return function(*tuple(args), **kwargs)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/callback.py", line 236, in process
    callback(*args, **keywords)
asyncio.base_futures.InvalidStateError: invalid state
Exception in callback Queue.declare.<locals>.on_queue_declared(<Future finis...nect Error',)>) at /home/decaz/workspace/aio-pika/aio_pika/queue.py:72
handle: <Handle Queue.declare.<locals>.on_queue_declared(<Future finis...nect Error',)>) at /home/decaz/workspace/aio-pika/aio_pika/queue.py:72>
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/home/decaz/workspace/aio-pika/aio_pika/queue.py", line 73, in on_queue_declared
    res = result.result()
ConnectionError: Auto Reconnect Error
Task exception was never retrieved
future: <Task finished coro=<RobustConnection.connect() done, defined at /home/decaz/workspace/aio-pika/aio_pika/robust_connection.py:101> exception=ConnectionError('Auto Reconnect Error',)>
Traceback (most recent call last):
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_connection.py", line 110, in connect
    yield from channel.on_reconnect(self, number)
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_channel.py", line 70, in on_reconnect
    yield from queue.on_reconnect(self)
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_queue.py", line 34, in on_reconnect
    yield from self.declare()
  File "/home/decaz/workspace/aio-pika/aio_pika/common.py", line 122, in wrap
    return (yield from func(self, *args, **kwargs))
  File "/usr/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/home/decaz/workspace/aio-pika/aio_pika/queue.py", line 73, in on_queue_declared
    res = result.result()
ConnectionError: Auto Reconnect Error
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Auto Reconnect Error',)>
ConnectionError: Auto Reconnect Error
Exception in callback BaseConnection._handle_events(fd=7, events=1)()
handle: <Handle BaseConnection._handle_events(fd=7, events=1)()>
...
Task exception was never retrieved
future: <Task finished coro=<RobustConnection.connect() done, defined at /home/decaz/workspace/aio-pika/aio_pika/robust_connection.py:101> exception=ConnectionError('Auto Reconnect Error',)>
Traceback (most recent call last):
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_connection.py", line 110, in connect
    yield from channel.on_reconnect(self, number)
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_channel.py", line 70, in on_reconnect
    yield from queue.on_reconnect(self)
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_queue.py", line 41, in on_reconnect
    yield from self.consume(consumer_tag=consumer_tag, **kwargs)
  File "/home/decaz/workspace/aio-pika/aio_pika/robust_queue.py", line 75, in consume
    consumer_tag = yield from super().consume(consumer_tag=consumer_tag, **kwargs)
  File "/home/decaz/workspace/aio-pika/aio_pika/common.py", line 122, in wrap
    return (yield from func(self, *args, **kwargs))
  File "/home/decaz/workspace/aio-pika/aio_pika/queue.py", line 199, in consume
    yield from future
ConnectionError: Auto Reconnect Error
Exception in callback Connection._on_connect_timer()
handle: <TimerHandle when=468382.871150722 Connection._on_connect_timer()>
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/connection.py", line 1811, in _on_connect_timer
    error = self._adapter_connect()
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/adapters/asyncio_connection.py", line 193, in _adapter_connect
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 170, in _adapter_connect
    error = self._create_and_connect_to_socket(sock_addr)
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 203, in _create_and_connect_to_socket
    sock_addr_tuple[0], sock_addr_tuple[1], sock_addr_tuple[2])
  File "/home/decaz/.virtualenvs/aio-pika/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 255, in _create_tcp_connection_socket
    return socket.socket(sock_family, sock_type, sock_proto)
  File "/usr/lib/python3.6/socket.py", line 144, in __init__
OSError: [Errno 24] Too many open files

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 32 (8 by maintainers)

Most upvoted comments

I am experiencing both Robust connection does not work along with Error flood by robust connection. I believe the original issue subject is more important than the latter and am a bit confused why it was changed above.

Using the following code:

import asyncio
import aio_pika
import logging

logger = logging.getLogger(__name__)


class RabbitMQConnection:
    def __init__(self, *, username='guest', password='guest', server='127.0.0.1', port: int=5672,
                 loop: asyncio.AbstractEventLoop=None, exchange='test', routing_key: str=None):
        self._connect_string = 'amqp://{username}:{password}@{server}:{port}/'.format(
            username=username, password=password, server=server, port=port)
        self._loop = loop or asyncio.get_event_loop()
        self._exchange = exchange
        self._routing_key = routing_key

    async def run_coroutine(self):
        logger.debug("Connection({}) coroutine started".format(self._connect_string))
        keep_going = True
        while keep_going:
            try:

                # connection = await aio_pika.connect(self._connect_string, loop=self._loop)
                # async with connection:

                connection = await aio_pika.connect_robust(self._connect_string, loop=self._loop)
                logger.info("Connection({}) established".format(self._connect_string))
                async with connection.channel() as channel:
                    exchange = await channel.declare_exchange(name=self._exchange, type=aio_pika.ExchangeType.TOPIC)
                    queue = await channel.declare_queue(name='test', auto_delete=True)
                    await queue.bind(exchange, routing_key='test.*')
                    while True:
                        await asyncio.sleep(0.001, loop=self._loop)
            except asyncio.CancelledError:
                logger.info("Connection({}) cancelled".format(self._connect_string))
                keep_going = False
            except (aio_pika.exceptions.AMQPError, ConnectionError) as error:
                logger.error("Connection({}) {}".format(self._connect_string, error))
                keep_going = True
            except Exception as ex:
                logger.exception(ex)
                keep_going = True
            finally:
                logger.info("Connection({}) lost".format(self._connect_string))

        logger.debug("Connection({}) coroutine exited".format(self._connect_string))


def main():
    logging.basicConfig()
    logger.setLevel('DEBUG')
    rmq = RabbitMQConnection()
    asyncio.get_event_loop().run_until_complete(rmq.run_coroutine())


if __name__ == '__main__':
    main()
  1. Start the python program.
  2. The connection is established.
  3. Stop the RabbitMQ service.
  4. The connection is lost.
  5. Error flood occurs (no Exceptions make it out to catch)
  6. Start the RabbitMQ service.
  7. Connection is never re-established.

The issue is actually more widespread than aio_pika.connect_robust() as it also affects aio_pika.connect(). The non robust connection also fails to emit Exceptions and also fails to reconnect.

The above code snippet can be used to test both cases.

Does anyone have some code that creates a robust I can use until this issue is resolved? This is a deal breaker for our project. I don’t even have pika module installed to revert it to 0.10.0 in my venv. Is it a requirement? My connection is working, somehow, without it showing up in a pip freeze

@Artimi now it’s done. Version 4.3.1 uploaded to pypi.

still in progress…

Hello! We have the same problem. I cannot catch the exclusion ` backend-reminder | Traceback (most recent call last): backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py”, line 119, in connect backend-reminder | yield from channel.on_reconnect(self, number) backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/robust_channel.py”, line 64, in on_reconnect backend-reminder | yield from self.initialize() backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/robust_channel.py”, line 74, in initialize backend-reminder | result = yield from super().initialize() backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/channel.py”, line 167, in initialize backend-reminder | self._channel = yield from self._create_channel(timeout) backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/channel.py”, line 145, in _create_channel backend-reminder | channel_number=self._channel_number backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/pika/connection.py”, line 710, in channel backend-reminder | self._channels[channel_number].open() backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/pika/channel.py”, line 635, in open backend-reminder | self._rpc(spec.Channel.Open(), self._on_openok, [spec.Channel.OpenOk]) backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/pika/channel.py”, line 1139, in _rpc backend-reminder | self._send_method(method_frame) backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/pika/channel.py”, line 1150, in _send_method backend-reminder | self.connection._send_method(self.channel_number, method_frame, content) backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/pika/connection.py”, line 1569, in _send_method backend-reminder | self._send_frame(frame.Method(channel_number, method_frame))

backend-reminder | File “/usr/local/lib/python3.6/site-packages/aio_pika/pika/connection.py”, line 1548, in _send_frame backend-reminder | raise exceptions.ConnectionClosed yaya-backend-reminder | aio_pika.pika.exceptions.ConnectionClosed

` After starting the Rabbit connection is restored. But exceptions go to sentry