MassTransit: Consumer doesn't consume messages after RabbitMQ restart

Is this a bug report?

Yes

Can you also reproduce the problem with the latest version?

Yes

Environment

  1. Operating system: Windows 10 Pro
  2. Visual Studio version: Community 2019
  3. Dotnet version: .NET Core 3.1

Steps to Reproduce

Steps:

  1. Set ConcurrentMessageLimit=1 (for easily bug reproducing; I noticed that it is more easily reproducible by hands with 1 concurrent message limit).
  2. Setup 3 RabbitMQ nodes (3.8 version) as a cluster (sync mode: ha-mode=all; ha-sync-mode: automatic), also they should be placed in docker for easy achieving of bug.
  3. Setup any proxy service for accessing the RabbitMQ cluster (e.g. HAProxy).
  4. Setup a consumer connection with RabbitMQ cluster via HAProxy (3).
  5. Send over 60000 messages (it happens when RabbitMQ syncing for a long time).
  6. Make a delay in a consume method (it can be equal 5 sec; it is important to get an error of already closed channel or “NACK failed”).
  7. Randomly kill and start RabbitMQ nodes while consuming (after some iterations of restarting RabbitMQ nodes and syncing consumer will stop to consume messages).

Expected Behavior

Consumer always reconnects to queue.

Actual Behavior

  • Consumer doesn’t reconnect to queue.
  • At channel section in RabbitMQ management message: “… no channels …”.
  • Last exceptions:
Message NACK failed: 6, Original Exception: System.Threading.Channels.ChannelClosedException: The channel has been closed.
   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
System.Threading.Channels.ChannelClosedException: The channel has been closed.
   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
   at MassTransit.RabbitMqTransport.Pipeline.RabbitMqReceiveLockContext.Faulted(Exception exception)
...
System.Threading.Channels.ChannelClosedException: The channel has been closed.
   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass23_0.<<HandleBasicDeliver>b__0>d.MoveNext()

Reproducible Demo

(Paste the link to an example project and exact instructions to reproduce the issue.)

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 60 (32 by maintainers)

Commits related to this issue

Most upvoted comments

So, I’m calling this done as I could consistently reproduce it and now with the final latest develop build it is no longer an issue.

Total time spent resolving it? Easily 12-16 hours in the past 1.5 days.

So, I tried to reproduce the failure to reconnect, but couldn’t. I did consistently reproduce the channel closed exception.

Source Code

[08:42:02 ERR] Message NACK failed: 33, Original Exception: System.Threading.Channels.ChannelClosedException: The channel has been closed.
   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
System.Threading.Channels.ChannelClosedException: The channel has been closed.
   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
   at MassTransit.RabbitMqTransport.Pipeline.RabbitMqReceiveLockContext.Faulted(Exception exception)
[08:42:02 ERR] T-FAULT rabbitmq://localhost/event-listener dc3a0000-ebb8-e450-7f0c-08d88bcf7b44
System.Threading.Channels.ChannelClosedException: The channel has been closed.
   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass23_0.<<HandleBasicDeliver>b__0>d.MoveNext()

Using MassTransit 7.0.6 directly from NuGet.

I’ll look at it.