hono: AbstractAtLeastOnceKafkaConsumer may consume nothing when a leader for a subscribed topic is not elected yet
AbstractAtLeastOnceKafkaConsumer first calls subscribe and then uses poll to query records from Kafka.
The call to subscribe though does not mean necessarily mean that the assignment of a consumer to a partition was made. This can be seen whenever the consumer subscribes to a topic which does not yet exist and right afterwards a record is published to this topic auto-creating it. Hono’s integration tests do follow this pattern: first the northbound consumer subscribes and right afterwards a message is published by a protocol adapter.
Consequently this leads to a race condition: if the assignment of consumers to partitions could not be done before the first poll is triggered then the client polls without a partition assignment, i.e. it polls nothing which each poll. This does not lead to an exception being thrown (afaik the Apache Kafka Client itself does not throw an error in that case as well), the client just does not receive any messages.
The described behaviour can be observed by the following logs (look for the line The following subscribed topics are not assigned to any members):
13:56:00.110 [vert.x-kafka-consumer-thread-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=telemetry-e4e91f55-f62b-4709-a21e-9c61bd5ece34, groupId=its-805d4582-3c66-49c5-bbe0-665ac4755f20] Error while fetching metadata with correlation id 2 : {hono.telemetry.d89cefce-3641-48d7-8664-08e85ff49226=LEADER_NOT_AVAILABLE}
13:56:00.110 [vert.x-kafka-consumer-thread-0] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=event-598c3d47-0859-4b2c-af74-d55ba22eb05e, groupId=its-805d4582-3c66-49c5-bbe0-665ac4755f20] Error while fetching metadata with correlation id 2 : {hono.event.d89cefce-3641-48d7-8664-08e85ff49226=LEADER_NOT_AVAILABLE}
13:56:00.151 [vert.x-kafka-consumer-thread-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=telemetry-e4e91f55-f62b-4709-a21e-9c61bd5ece34, groupId=its-805d4582-3c66-49c5-bbe0-665ac4755f20] Error while fetching metadata with correlation id 7 : {hono.telemetry.d89cefce-3641-48d7-8664-08e85ff49226=LEADER_NOT_AVAILABLE, hono.event.d89cefce-3641-48d7-8664-08e85ff49226=LEADER_NOT_AVAILABLE}
13:56:00.153 [vert.x-kafka-consumer-thread-1] WARN o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=telemetry-e4e91f55-f62b-4709-a21e-9c61bd5ece34, groupId=its-805d4582-3c66-49c5-bbe0-665ac4755f20] The following subscribed topics are not assigned to any members: [hono.telemetry.d89cefce-3641-48d7-8664-08e85ff49226, hono.event.d89cefce-3641-48d7-8664-08e85ff49226]
I did a bit of research but I could not find a workaround yet. An obvious solution would be to use the partitionsAssignedHandler of Vert.x’s KafkaConsumer, so that the poll will only be started whenever the consumer was properly assigned to all partitions. However this partitionsAssignedHandler is never called unless a poll is triggered (or a handler was set on the consumer, which then leads to losing control over the poll loop). So it feels like a hen and egg problem to me.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 23 (23 by maintainers)
It turned out that setting the Kafka client’s configuration property
auto.offset.reset(see https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_auto.offset.reset) toearliestmakes the problem (consumer is not assigned to any partition) disappear on my machine (which is macOS based) as well on a Linux machine running the same test in a Jenkins job.I cannot make any sense of that - at the time of this writing I assume it is more a side-effect of setting this property.
(when writing integration tests I became aware that not all expected messages were received which turned out to be due the default value
auto.offset.reset. In the process I noticed that this also make the aforementioned “workaround” meaning usingpartitionsForobsolete).As far as I know, automatic topic creation is not recommended for production setups. For Hono, the idea was that when a tenant is created, the tenant manager also creates its topics and corresponding ACLs. Automatic topic creation is configured at the broker. So we cannot assume that a cluster has this enabled. It looks like Confluent disables it too: https://riferrei.com/2020/03/17/why-the-property-auto-create-topics-enable-is-disabled-in-confluent-cloud/
@kaniyan What I am interested in is if it is possible to use the abstraction the Vert.x KafkaProducer provides combined with manual offset committing. My understanding was that you can either use the handler and the helping handlers like partitionsAssignedHandler with automatic offset commit (or maybe external offset storage). Or you do not use all of that and have to use the explicit polling and commiting, i.e. control the loop yourself. If it is true that we cannot mix, we need to avoid the abstractions of the KafkaConsumer. The background of my question was, that I wanted to know if you tested if this might be false and we could use the handlers of the abstraction and combine this with explicit committing. Because that would be an interesting option.