smallrye-reactive-messaging: RabbitMQ - BackPressureFailure: Buffer full

I am regulary getting BackPressureFailures from my RabbitMQ consumer in my Quarkus (2.12.2-Final, Java 17) application.

io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
	at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
	at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
	at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
	at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
	at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.handleMessage(RabbitMQConsumerImpl.java:150)
	at io.vertx.rabbitmq.impl.QueueConsumerHandler.lambda$handleDelivery$0(QueueConsumerHandler.java:39)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
	at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:833)
	at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:596)
	at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

The weird thing is that I have configured max-outstanding-messages, so in my understanding it should be impossible to get this error.

mp:
  messaging:
    incoming:
      new-message:
        automatic-recovery-enabled: true
        max-outstanding-messages: 2
        exchange:
          name: x
          type: topic
        queue:
          name: x
        routing-keys:
          - x.#

Here’s the code that handles messages. consume contains a catch-all statement.

   @Incoming("new-event")
    @Blocking
    public CompletionStage<Void> newMessage(Message<?> message) {
        var meta = message.getMetadata(IncomingRabbitMQMetadata.class);
        meta.ifPresent(amqp -> {
            var key = amqp.getRoutingKey();
            LOG.info("Message {} received", key);
            consume(key, amqp.getHeaders());
        });
        return message.ack();
    }

Any ideas how to approach this?

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Reactions: 5
  • Comments: 17

Commits related to this issue

Most upvoted comments

Hi everyone, any news about this topic? We have the same problem introduced when we migrated application from springBoot to quarkus and for simulate the prefetch count we used the “max-outstanding-messages” setting the value to 250.

Should we have a workaround? For example increasing this number… Regards