quarkus: Reactive Messaging (RabbitMQ) - can't ack messages manually with consumer ack

Describe the bug

It seems that whatever I do messages get always acknowledged automatically. How can I ack a message manually?

Code:

    @Incoming("prices")
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> process(Message<String> message) {
        String priceInUsdString = message.getPayload();
        Integer priceInUsd = Integer.parseInt(priceInUsdString);
        LOGGER.infof("process priceInUsd=%d", priceInUsd);
        return message.ack();
    }
    @Inject
    @Channel("generated-price")
    Emitter<Integer> emitter;

    public Integer generateInteger() {
        Integer i = random.nextInt(100);
        LOGGER.infof("generateInteger i=%d send", i);
        emitter.send(
                Message.of(i, () -> {
                            // Called when the message is acknowledged.
                            LOGGER.infof("generateInteger i=%d ack", i);
                            return CompletableFuture.completedFuture(null);
                        },
                        reason -> {
                            // Called when the message is acknowledged negatively.
                            LOGGER.infof("generateInteger i=%d nack", i);
                            return CompletableFuture.completedFuture(null);
                        }));
        return i;
    }

Calling:

curl http://localhost:8080/prices/generate

Produces the output:

2022-10-03 11:25:31,810 INFO  [org.acm.amq.PriceGenerator] (executor-thread-0) generateInteger i=26 send
2022-10-03 11:25:31,816 INFO  [org.acm.amq.PriceGenerator] (vert.x-eventloop-thread-1) generateInteger i=26 ack
2022-10-03 11:25:31,817 INFO  [org.acm.amq.PriceConverter] (vert.x-eventloop-thread-2) process priceInUsd=26
2022-10-03 11:25:31,818 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17033: A message sent to channel `prices` has been ack'd

Expected behavior

I’d expect the following output (the ack message only after the message has been acked):

2022-10-03 11:25:31,810 INFO  [org.acm.amq.PriceGenerator] (executor-thread-0) generateInteger i=26 send

2022-10-03 11:25:31,817 INFO  [org.acm.amq.PriceConverter] (vert.x-eventloop-thread-2) process priceInUsd=26
2022-10-03 11:25:31,818 INFO  [io.sma.rea.mes.rabbitmq] (vert.x-eventloop-thread-2) SRMSG17033: A message sent to channel `prices` has been ack'd
2022-10-03 11:25:31,816 INFO  [org.acm.amq.PriceGenerator] (vert.x-eventloop-thread-1) generateInteger i=26 ack

Actual behavior

The ‘message is acknowledged’ block gets called before the message is acked.

How to Reproduce?

  1. Run the attached reproducer.
  2. Run: curl http://localhost:8080/prices/generate
  3. Check output

ack-test.zip

Requirements: an rabbitmq instance

Output of uname -a or ver

Linux a 5.15.65-1-MANJARO #1 SMP PREEMPT Mon Sep 5 10:15:47 UTC 2022 x86_64 GNU/Linux

Output of java -version

openjdk version “18.0.2” 2022-07-19 OpenJDK Runtime Environment (build 18.0.2+0) OpenJDK 64-Bit Server VM (build 18.0.2+0, mixed mode)

Quarkus version or git rev

Quarkus 2.13.0.Final

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: /opt/apache-maven-3.8.6 Java version: 11.0.14.1, vendor: Eclipse Adoptium, runtime: /opt/jdk-11.0.14.1+1 Default locale: en_US, platform encoding: UTF-8 OS name: “linux”, version: “5.15.65-1-manjaro”, arch: “amd64”, family: “unix”

Additional information

No response

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 17 (8 by maintainers)

Most upvoted comments

I’ve not confirmed but I believe we use the RabbitMQPublisher feature (from https://vertx.io/docs/vertx-rabbitmq-client/java/#_reliable_message_publishing) of the underlying vertx client, which does publisher confirmations.