quarkus: Problem with quarkus kafka smallrye consumer multithreading

Describe the bug

By setting “partitions” parameter to 3. I have 3 different consumer ids, but the messages are processed sequentially.

  • quarkus version 2.X
  • smallrye

Properties that i set

mp:
  messaging:
    incoming:
      event:
        connector: smallrye-kafka
        auto:
          offset:
            reset: earliest
        topic: MY_TOPIC
        group:
          id: my-group
        partitions: 3

Expected behavior

the messages should be processed concurrently for each partition

Actual behavior

the messages are processed sequentialy though we have different partitions

Output of java -version

jdk11

Quarkus version or git rev

2.4.1

Build tool (ie. output of mvnw --version or gradlew --version)

3.6.3

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 26 (12 by maintainers)

Most upvoted comments

No, the documentation is right, it just does not do the dispatching as you describe it, and will still serialize the records following the reactive streams semantic.

Another solution is something like:


List<Executor> executors = ...

@Incoming("kafka")
@Outgoing("foo")
public Multi<Message<T>> dispatch(Multi<Message<T>> multi) {
   return multi.onItem().transformToUniAndMerge(msg -> {
       IncomingKafkaMetadata metadata = msg.getMetadata(IncomingKafkaMetadata.class).orElseThrow();
       int p = metadata.getPartition();
       Executor executor = executors.get(p);
       return Uni.createFrom().item(msg).emitOn(executor);
  }); 
}

Note that even with this, the emission will still be serialized. They would be multi-threaded, but serialized. You may want to do something like:

@Incoming("kafka")
@Outgoing("foo")
public Multi<Message<T>> dispatch(Multi<Message<T>> multi) {
   return multi.onItem().transformToUniAndMerge(msg -> {
       IncomingKafkaMetadata metadata = msg.getMetadata(IncomingKafkaMetadata.class).orElseThrow();
       int p = metadata.getPartition();
       Executor executor = executors.get(p);
       return Uni.createFrom().item(msg).emitOn(executor)
           .onItem().invoke(msg -> {
                    process(msg); msg.ack(); })
  }); 
}

so, the processing is running on the different threads.

You can try with the low-level kafka client, but it’s a rabbit hole.

hi, so the guide it’s wrong? https://quarkus.io/guides/kafka paragraph 4.5 -> 2

@davideginna Thanks for the reproducer. Your tests are running on quarkus version 2.2.1.Final which had an issue with the @Blocking handling. Starting from 2.4.0.Final you shouldn’t have the issue anymore. BTW here are couple more issues I noticed:

  • In ResourceConsumer.java the incoming method should subscribe to the channel event, the incoming channel you configured, instead of sender.
  • Your incoming method received events through an in-memory channel because you had a @Broadcast on the producer.
  • To can get the incoming message partition with message.getMetadata(IncomingKafkaRecordMetadata.class). Previously you were accessing the outgoing metadata of the message transmitted through the in-memory channel not Kafka.
  • I think you need to configure the partition numbers on the Kafka resource you are creating, alternatively you can use the dev services for Kafka for creating topic partitions for you.

Hope this helps.