smallrye-reactive-messaging: Kafka consume stopped without any exception log
Problem description
My Kafka consumers will occasionally stop consuming, Consumers are distributed across multiple machine nodes, and each node is configured with multiple consuming threads (partitions>1). Once in a while, one of the machine nodes stops consuming (the other nodes do not stop). And there will be no consumer rebalancing. How can I eliminate this problem.
It happens about a couple of times a week, I have multiple topic comsumer, and it has happened in multiple topic comsumer.
Kafka Grafana Metric

Config
mp:
messaging:
incoming:
location-traffic-in:
connector: smallrye-kafka
topic: location
graceful-shutdown: true
group:
id: location-traffic
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto:
offset:
reset: latest
enable:
auto:
commit: true
fetch:
max:
wait:
ms: 10
batch: true
broadcast: false
partitions: 3
requests: 700
max.poll.records: 200
failure-strategy: ignore
Code
@Slf4j
@ApplicationScoped
public class LocationTrafficConsumer {
@Incoming("location-traffic-in")
@Acknowledgment(Acknowledgment.Strategy.NONE)
@NonBlocking
public Uni<List<Void>> consume(List<String> events) {
try {
// ....
return Uni.join().all(uniList).andCollectFailures()
.onFailure().retry().atMost(3)
.onFailure().invoke(t -> LIMIT_EXCEPTION.tryAcquire(() -> log.warn("Deal Exception", t)))
.onFailure().recoverWithNull();
} catch (Throwable e) {
log.warn("Exception", e);
return Uni.createFrom().nullItem();
}
}
}
Environment and dependency packages
- System:Linux 4.4.0-116-generic
- JDK:openjdk-17
- Framework:Quarkus
- Dependency:smallrye-reactive-messaging-kafka 3.13.0
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 18 (2 by maintainers)
Is it possible that processing of some records takes more than 60s? Therefore setting 500s to unprocessed max age takes you to the safe side of that check?