smallrye-reactive-messaging: Kafka consume stopped without any exception log

Problem description

My Kafka consumers will occasionally stop consuming, Consumers are distributed across multiple machine nodes, and each node is configured with multiple consuming threads (partitions>1). Once in a while, one of the machine nodes stops consuming (the other nodes do not stop). And there will be no consumer rebalancing. How can I eliminate this problem.

It happens about a couple of times a week, I have multiple topic comsumer, and it has happened in multiple topic comsumer.

Kafka Grafana Metric

image

Config

mp:
  messaging:
    incoming:
      location-traffic-in:
        connector: smallrye-kafka
        topic: location
        graceful-shutdown: true
        group:
          id: location-traffic
        key:
          deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value:
          deserializer: org.apache.kafka.common.serialization.StringDeserializer
        auto:
          offset:
            reset: latest
        enable:
          auto:
            commit: true
        fetch:
          max:
            wait:
              ms: 10
        batch: true
        broadcast: false
        partitions: 3
        requests: 700
        max.poll.records: 200
        failure-strategy: ignore

Code

@Slf4j
@ApplicationScoped
public class LocationTrafficConsumer {
    @Incoming("location-traffic-in")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    @NonBlocking
    public Uni<List<Void>> consume(List<String> events) {
        try {
			// ....
            return Uni.join().all(uniList).andCollectFailures()
                    .onFailure().retry().atMost(3)
                    .onFailure().invoke(t -> LIMIT_EXCEPTION.tryAcquire(() -> log.warn("Deal Exception", t)))
                    .onFailure().recoverWithNull();
        } catch (Throwable e) {
            log.warn("Exception", e);
            return Uni.createFrom().nullItem();
        }
    }

}

Environment and dependency packages

  • System:Linux 4.4.0-116-generic
  • JDK:openjdk-17
  • Framework:Quarkus
  • Dependency:smallrye-reactive-messaging-kafka 3.13.0

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 18 (2 by maintainers)

Most upvoted comments

Is it possible that processing of some records takes more than 60s? Therefore setting 500s to unprocessed max age takes you to the safe side of that check?