EasyNetQ: Subscriptions not cached when RabbitMQ is unavailable

There is an issue with subscription requests not being cached when RabbitMQ is unavailable (and there is a timeout set). There is also an issue with the Bus.Subscribe and Bus.SubscribeAsync methods blocking infinitely if no timeout is set. From reading this documentation:

Subscribers can subscribe using bus.Subscribe even when a broker is not available. The subscription details are cached by EasyNetQ. When a broker becomes available the connection loop succeeds, a connection with the broker is established, and all the cached subscriptions are created.

it suggests that the subscription request is cached. This doesn’t seem to be the case because:

  • If a timeout is set, an exception is thrown after the timeout expires (note that an exception is not thrown if RabbitHutch.CreateBus fails to connect within the timeout period). After this the subscription will never be created, even after RabbitMQ comes back online.
  • The thread is blocked until the subscription completes (if no timeout). So it is not possible to create multiple subscriptions to be created when the connection is restored.

This is problematic when attempting to subscribe when RabbitMQ is down.

The following code demonstrates the issue (RabbitMQ must be unavailable to demonstrate issues).

var bus = RabbitHutch.CreateBus(_connectionString, eventHandlers, serviceRegister =>
{
   serviceRegister.Register<IEasyNetQLogger>(_ => new ConsoleLogger());
});

bus.Subscribe<string>("Test1", s => { Console.WriteLine("Test1 received: " + s); });
bus.Subscribe<string>("Test2", s => { Console.WriteLine("Test2 received: " + s); });

If RabbitMQ is unavailable, then the method will block attempting to subscribe to Test1. If there is a timeout then the following exception gets thrown by the Subscribe method. The subscription then does not get created if RabbitMQ reconnects.

System.TimeoutException: The operation requested on PersistentChannel timed out.
    at EasyNetQ.Producer.ClientCommandDispatcherSingleton.Invoke[T](Func`2 channelAction)
    at EasyNetQ.Producer.ClientCommandDispatcher.Invoke[T](Func`2 channelAction)
    at EasyNetQ.RabbitAdvancedBus.QueueDeclare(String name, Boolean passive, Boolean durable Boolean exclusive, Boolean autoDelete, Nullable`1 perQueueMessageTtl, Nullable`1 expires, Nullable`1 maxPriority, String deadLetterExchange, String deadLetterRoutingKey, Nullable`1 maxLength, Nullable`1 maxLengthBytes)
    at EasyNetQ.RabbitBus.SubscribeAsync[T](String subscriptionId, Func`2 onMessage, Action`1 configure)
    at EasyNetQ.RabbitBus.Subscribe[T](String subscriptionId, Action`1 onMessage, Action`1 configure)
    at EasyNetQ.RabbitBus.Subscribe[T](String subscriptionId, Action`1 onMessage)
    at EasyNetQTest.Program.Main(String[] ingore) in c:\Users\alistair.clark\Documents\Visual Studio 2013\Projects\EasyNetQTest\EasyNetQTest\Program.cs:line 44 string

About this issue

  • Original URL
  • State: open
  • Created 8 years ago
  • Comments: 16 (9 by maintainers)

Most upvoted comments

Hi,

Not actually had a chance to work on the PR yet. I’ll let you know once I start.