EasyNetQ: System.NotSupportedException: Pipelining of requests forbidden

Hi,

I have the following issue.

In my local machine RabbitMQ works fine with no issue. Local Version is 3.7.0, Erlang 20.1.

But when I try to use other (using network) version of RabbitMQ it sends me the following exception Version 3.6.14 Erlang 18.3

System.NotSupportedException: Pipelining of requests forbidden at RabbitMQ.Client.Impl.RpcContinuationQueue.Enqueue(IRpcContinuation k) at RabbitMQ.Client.Impl.ModelBase.Enqueue(IRpcContinuation k) at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete, Boolean internal, Boolean nowait, IDictionary2 arguments) at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary2 arguments) at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass33_0.<ExchangeDeclareAsync>b__1(IModel x) at EasyNetQ.Producer.ClientCommandDispatcherSingleton.<>c__DisplayClass8_0.<InvokeAsync>b__0(IModel x) at EasyNetQ.Producer.ClientCommandDispatcherSingleton.<>c__DisplayClass7_01.b__1(IModel channel) at EasyNetQ.Producer.PersistentChannel.InvokeChannelAction(Action1 channelAction) at EasyNetQ.Producer.ClientCommandDispatcherSingleton.<>c__DisplayClass7_01.b__0() — End of stack trace from previous location where exception was thrown — at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at EasyNetQ.RabbitAdvancedBus.d__33.MoveNext() — End of stack trace from previous location where exception was thrown — at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at EasyNetQ.Producer.PublishExchangeDeclareStrategy.d__4.MoveNext() — End of stack trace from previous location where exception was thrown — at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at EasyNetQ.RabbitBus.d__131.MoveNext() — End of stack trace from previous location where exception was thrown — at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at WOF.Bus.Infrastructure.EasyNetQBusAdapter.<PublishAsync>d__61.MoveNext()'`

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 47 (27 by maintainers)

Most upvoted comments

Hi guys, recently I’ve been facing exactly the same exception and after some investigating I found out this is related to passive declaration (either IModel.QueueDeclarePassive and IModel.ExchangeDeclarePassive). As stated in rabbitmq client docu (https://www.rabbitmq.com/dotnet-api-guide.html#passive-declaration) if queue/exchange doesn’t exists you’ll get OperationInterruptedException and this channel cannot be used anymore. However, AdvancedBus when passing parameter passive = true will enqueue this operation and in case of error will still reuse persistent channel which will leads to massive NotSupportedExceptions. In our case we get rid of passive flag when executing QueueDeclare, proper soultion would involve recreating persistent channel in StartDispatcherThread when handling OperationInterruptedException in ClientCommandDispatcherSingleton as OperationInterrupted is a channel-level exception. I might test it in near future. Hope it helps

Hey. I had the same problems. I have proxies for some interfaces and got a stack:

System.NotSupportedException: Pipelining of requests forbidden in EasyNetQ.Producer.ClientCommandDispatcherSingleton.Invoke[T](Func2 channelAction) in EasyNetQ.Producer.ClientCommandDispatcher.Invoke[T](Func2 channelAction) in EasyNetQ.RabbitAdvancedBus.QueueDeclare(String name, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Nullable1 perQueueMessageTtl, Nullable1 expires, Nullable1 maxPriority, String deadLetterExchange, String deadLetterRoutingKey, Nullable1 maxLength, Nullable1 maxLengthBytes) in Castle.Proxies.Invocations.IAdvancedBus_QueueDeclare.InvokeMethodOnTarget() in Castle.DynamicProxy.AbstractInvocation.Proceed() in Castle.DynamicProxy.StandardInterceptor.PerformProceed(IInvocation invocation) in Castle.DynamicProxy.StandardInterceptor.Intercept(IInvocation invocation) in Castle.DynamicProxy.AbstractInvocation.Proceed() in Castle.Proxies.IAdvancedBusProxy.QueueDeclare(String name, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, Nullable1 perQueueMessageTtl, Nullable1 expires, Nullable1 maxPriority, String deadLetterExchange, String deadLetterRoutingKey, Nullable1 maxLength, Nullable1 maxLengthBytes) in EasyNetQ.RabbitBus.SubscribeAsync[T](String subscriptionId, Func2 onMessage, Action1 configure) in EasyNetQ.RabbitBus.Subscribe[T](String subscriptionId, Action1 onMessage, Action1 configure) in MyNamespace.Rabbit..(IBus ) in MyNamespace.Rabbit.ActionBusWrapper.(IBus ) in MyNamespace.Rabbit.(ActionBusWrapper , IBus )

I logged thread ids. It turned out that the error occurs when the InternalConsumer object is dispose: public void Dispose() { if (this.disposed) return; this.disposed = true; if (this.Model == null) return; this.consumerDispatcher.QueueAction((Action) (() => //this code { this.Model.Dispose(); foreach (BasicConsumer basicConsumer in (IEnumerable<BasicConsumer>) this.basicConsumers) basicConsumer.Dispose(); })); ` }
consumerDispatcher thread id == thread id of method caller.

I copy InternalConsumerFactory implementation and InternalConsumer implementation. In InternalConsumerFactory i create my InternalConsumer. In InternalConsumer i remove dispatcher call and NotSupportedException error disappeared. I use this code: public void Dispose() { if (this.disposed) return; this.disposed = true; if (this.Model == null) return; this.Model.Dispose(); foreach (BasicConsumer basicConsumer in (IEnumerable<BasicConsumer>) this.basicConsumers) basicConsumer.Dispose(); }

@Pliner I’ve tried to reproduce it but no luck so far. Now it’s bit more difficult to tell if it’s a EasyNetQ issue or RabbitMQ .net client issue as @michaelklishin said.

How it happen seems to be:

  1. Right after an disconnect/reconnect event (I’m not sure if this is essential)
  2. EasyNetQ: start to declare a queue.
  3. RabbitMQ client: ModelBase called m_continuationQueue.Enqueue()
  4. RabbitMQ client: When reading (writing?) the response from the socket, it timeouted (30 seconds).
  5. This is likely where bug happen: Either EasyNetQ or RabbitMQ client doesn’t got this network disconnection-ish event tracked/handled correctly.
  6. EasyNetQ: Still consider the connection is connected, so it still uses the same ModelBase. When try to declare next queue, the “pipelining” UnSupportedException throws from RabbitMQ client. (Because the call back from the RabbitMQ client’s SocketFrame -> m_continuationQueue.Next() didn’t happen.)

Difficult to simulate this is, I’m not able to simulate the timeout in the right time. And I don’t have time to check how RabbitMQ/EasyNetQ handles network errors yet. So not able to tell which side causes the bug.

There is no evidence that this is a .NET client bug.

When a continuation times out it can be a natural consequence that more operations are attempted by application code immediately after. This is something we’ve seen in at least two other clients with a similar architecture. Either way the caller will get an exception. If it’s not a timeout exception but a “pipelining one” the client could be more defensive but all we have here is a stack trace and speculation.

@zvolkov, @Vladimirezh, @mmikirtumov Feel free to reopen this issue if the problem will happen again in 3.3.

@Pliner I think that this is something like a deadlock. Deadlock didnt happen because Dispose throws TimeoutException. I proceeded from the fact that the Model is not thread safe. I proxy the consumer IModel and logged the method, model number (just inc int then create model called) and thread id of call. When an error occurred, it was always called Dispose. Dispose throws TimeoutException, then an ACK came from another thread. I logged the thread number before the dispatcher called and the thread was with id of the thread in which the consumer was created. I cant say if I did the right thing by removing the dispatcher call. But for 4 days this error was not repeated and there wasnt disconnect from the RabbitMq (before, after the error the RabbitMq broke client connection).