kafka-node: Wrong offset is being committed, messages are missing
Hi there, we are currently having problems running “kafka-node” in our production system.
Info:
- Kafka-Broker Version 0.10.1
- Node.js Version 7.4.0
- kafka-node Version: 1.5.0
- using ConsumerGroups
- using a ConsumerGroup per topic
- 3 topics with ~ 4,5 million messages each on 30 partitions each
- Java and Scala consumer implementations have no problems consuming these topics
Implementation:
- We are using the suggested way to control the incoming message flow: https://github.com/krystianity/node-sinek/blob/master/lib/kafka/Drainer.js#L292 using async.queue with a size of one
- the problem also exists if we just consume any message, with “autoCommit” turned off and without async.queue flow control
Problem:
- the consumer reads messages just fine for a while, then suddenly stops @ about 800k incoming messages, sometimes makes it to ~ 2 million - however it never fully receives all messages (at least they are not emitted .on(“message”,…) and if we take a look at the consumer afterwards using Kafka Tool, we can see that the consumer-group has committed offsets to the latest state of the partitions - even though we are not committing manually and autCommit has been turned off.
- (the reason why we see offset commits being made, is because we have turned on offset commit during close “consumer.close(true,…)” - yet the offsets shoud not be at the End of the partitions offset!)
- if we consume other topics with fewer messages ~ 500k it works just fine
Questions:
- is there any chance that using multiple ConsumerGroup instances in the same node process at once can break the consuming or offset committing process?
- why is the final “consumer.close(true,…)” committing offsets for messages that we never recieved?
- is there a chance (or better reason) for why we might drop messages on “.on(“message”,…)” ?
- do we have to make any “batch” ack settings for consumers?
- is there a chance that pausing & resuming the consumer too often kills the connection or partition offsets?
Configuration:
ssl: false,
groupId: "a-group-id",
sessionTimeout: 30000,
protocol: ["roundrobin"],
fromOffset: "earliest",
migrateHLC: false,
migrateRolling: false,
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 100,
autoCommit: false,
autoCommitIntervalMs: 5000
We have hit some sort of wall here, thanks for any guesses in advance.
Chris
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Reactions: 3
- Comments: 20
@hyperlink thanks again for your support and attempts to ensure the trust in this lib, we were able to identify the reason for the problems. And I am happy to sum them up, for anyone else who might be stuck in a similar situation in the future.
using 1 or 30 partitions does not matter, having a single instance or a cluster does not matter either, we ran the large-message-test against all kinds of setups.
combining .setOffset with manual consumer.commits() is a bad idea
expecting .setOffset(topic, partition, 0) to work on a running consumer, even paused, can be problematic and is probably a bad behavior for any kind of kafka consumer
when “experiencing missing messages” and “committed high offsets” make sure to understand the topic you are consuming - we have a large amount of topics here, with different owners. Keyed partitions might be very well subject to log compaction in large fashion depending on the data and frequency of publishes the topic goes through.
@krystianity here’s the updated test I wrote earlier to consume from 30 partitions w/
autoCommitoff. I ran it several times without issues.I ran the test several times in a row (test need to be adjusted) without resetting the kafka container was able to consume up to 6.3 million messages in 527 seconds.
Setup topic
Kafka bin tools
Test code
./test/large-cs-test.jsI wrote a simple test single consumer against a single broker, and partition. I published 2.1 million messages and was able to consume them without an issue. Ran it several times. https://github.com/SOHU-Co/kafka-node/commit/283e6a88492168ed86993b683e5fc0868a469e26
If you’re interested in running it checkout the large-message-test branch.
You will need to run the test w/o timeout since it could take about 14 minutes to run.
./node_modules/.bin/mocha --no-timeouts test/test.consumerGroup.js --grep "large"How many consumers are your group?