smallrye-reactive-messaging: RabbitMQ - BackPressureFailure: Buffer full
I am regulary getting BackPressureFailures from my RabbitMQ consumer in my Quarkus (2.12.2-Final, Java 17) application.
io.smallrye.mutiny.subscription.BackPressureFailure: Buffer full, cannot emit item
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.failOverflow(MultiFlatMapOp.java:541)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:243)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.vertx.MultiReadStream.lambda$subscribe$2(MultiReadStream.java:77)
at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.lambda$handler$0(RabbitMQConsumerImpl.java:59)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
at io.vertx.rabbitmq.impl.RabbitMQConsumerImpl.handleMessage(RabbitMQConsumerImpl.java:150)
at io.vertx.rabbitmq.impl.QueueConsumerHandler.lambda$handleDelivery$0(QueueConsumerHandler.java:39)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:833)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:596)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
The weird thing is that I have configured max-outstanding-messages, so in my understanding it should be impossible to get this error.
mp:
messaging:
incoming:
new-message:
automatic-recovery-enabled: true
max-outstanding-messages: 2
exchange:
name: x
type: topic
queue:
name: x
routing-keys:
- x.#
Here’s the code that handles messages. consume contains a catch-all statement.
@Incoming("new-event")
@Blocking
public CompletionStage<Void> newMessage(Message<?> message) {
var meta = message.getMetadata(IncomingRabbitMQMetadata.class);
meta.ifPresent(amqp -> {
var key = amqp.getRoutingKey();
LOG.info("Message {} received", key);
consume(key, amqp.getHeaders());
});
return message.ack();
}
Any ideas how to approach this?
About this issue
- Original URL
- State: open
- Created 2 years ago
- Reactions: 5
- Comments: 17
Hi everyone, any news about this topic? We have the same problem introduced when we migrated application from springBoot to quarkus and for simulate the prefetch count we used the “max-outstanding-messages” setting the value to 250.
Should we have a workaround? For example increasing this number… Regards