aio-pika: Robust_connection and RuntimeError "Writer is None"

Is it ok that robust_connection with args reconnection_interval=0.5 and timeout=2 throws exception when connection couldn’t be re-established during exchange.publish(Message(...))?

RuntimeError: Writer is None
Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib/python3.7/asyncio/tasks.py:596> exception=RuntimeError('Writer is None')>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/tasks.py", line 223, in __step
    result = coro.send(None)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 603, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.7/dist-packages/aiormq/tools.py", line 86, in __await__
    return (yield from self().__await__())
  File "/usr/local/lib/python3.7/dist-packages/aiormq/connection.py", line 136, in drain
    raise RuntimeError("Writer is %r" % self.writer)
RuntimeError: Writer is None
socket.send() raised exception. 

When I use just a Connecton class and wrap connection with:

while self.cnx is None:
           try:
               self.cnx = await connect(f"amqp://{self.user}:{self.password}@{self.host}/", loop=self.loop, timeout=2)
           except (ConnectionError, ConnectionRefusedError, ConnectionResetError, ConnectionAbortedError):
               continue
else:
           self.connected = True
           self.ch = await self.cnx.channel()
           if self.exchange_type == 'fanout':
               self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.FANOUT)
           elif self.exchange_type == 'topic':
               self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.TOPIC)
           elif self.exchange_type == 'direct':
               self.ex = await self.ch.declare_exchange(self.exchange, ExchangeType.DIRECT)
           if not self.queue is None:
               self.queue = await self.ch.declare_queue(self.queue_name, durable=True)
               await self.queue.bind(self.ex, self.bind)
           return self

I see no exceptions during code execution. I use this to reconnect to server if connection was lost during message sending. I know that I’ve reinvented a wheel, but it works as I expected.

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 4
  • Comments: 15 (8 by maintainers)

Commits related to this issue

Most upvoted comments

@mosquito So, I tracked this down to being an issue with the RobustChannel not actually being robust. When a channel dies, you can call await channel.reopen() and it fixes everything, but I would expect RobustChannel to handle this itself. Instead, the channel just dies, and reopen has to be called manually. Close callbacks aren’t called when a channel is closed non-gracefully, so you can’t just add a callback to re-open.

You can reproduce by creating a channel, then doing something that will close the channel such as binding a queue that doesn’t exist to an exchange.

I am encountering the same situation. Minimal reproduction:

import contextlib
import aio_pika

connection = await aio_pika.connect_robust(url)
exit_stack = contextlib.AsyncExitStack()
await exit_stack.enter_async_context(connection)
channel = await connection.channel()
await channel.declare_queue(subject_that_exists, passive=True)  # works fine
await channel.declare_queue(subject_that_doesnt_exist, passive=True)  # raises aiormq.exceptions.ChannelNotFoundEntity
await channel.declare_queue(subject_that_exists, passive=True)  # raises aiormq.exceptions.ChannelInvalidStateError: writer is None
await channel.reopen()
await channel.declare_queue(subject_that_exists, passive=True)  # works fine