kafkajs: Consumer offset "stuck" on log compacted messages

Hello,

I’m looking for some help with an issue where my consumer is “stuck” at a certain offset. (Or bunch of offsets: one offset for each partition in the topic.)

To break it down:

  • I am trying to consume a log-compacted topic with 12 partitions (hundreds of millions of messages).
  • I deployed multiple instances of our consumer service.
  • The consumers started out great, but after a while, they all came to a halt. The offsets have not changed for a few days. For each partition, they are still lagging many millions of messages behind.
  • I made a local copy of KafkaJS and added some extra debug logs to do some digging. Here’s what I found:
    • For a given partition, the offset was n.
    • The messages at offset n have been log compacted away. The next available message is at, say, n + 13.
    • KafkaJS would fetch the messages at offset n.
    • It would receive a few messages, but all of them have offset < n. For example, it might receive one message at offset n - 1, or two messages at offset n - 2 and n - 1.
    • in batch.js these messages are ignored, so the result is treated as an empty batch. (It only uses messagesWithinOffset.)
    • The “empty” batch is skipped, and KafkaJS repeats the above steps.
    • My eachBatch callback is never triggered.

I am not sure what is going wrong here: do I need to change some configuration? (Currently using defaults across the board.) Has the consumer group entered a bad state, somehow? Or could this be a bug?

Advice would be greatly appreciated. Thanks in advance! 🙂

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 19 (2 by maintainers)

Commits related to this issue

Most upvoted comments

Hi @tulios

Thanks for looking into this. I am indeed running 1.11.0. I tried 1.12.0-beta.0 after reading your suggestion, but alas, it didn’t fix the issue. Although I do think it’s very closely related to our issue!

I will try to break it down as I understand it. Please let me know if I’ve misunderstood something. 🙂

The issue you mentioned (PR #511)

Underlying Cause

KafkaJS could get stuck on an offset marker for a Control Record, because it would ignore the Control Record, handle an empty batch by doing nothing, fetch the same Control Record Batch again, and repeat.

Fix

Fixed an issue by incrementing the offset by 1 when a control batch was fetched.

In the Java Consumer

Useful comment in the Java consumer: https://github.com/apache/kafka/blob/9aa660786e46c1efbf5605a6a69136a1dac6edb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1499-L1505

This issue

Underlying Cause

KafkaJS can get stuck on an offset marker for a compacted Record, because it will fetch an “empty” batch, handle the “empty” batch by doing nothing, and repeat. (The fetch did return some records, but they are all from an earlier offset than requested, so these records are ignored and the batch is treated like an empty batch.)

Hacky Fix

Can be fixed by incrementing the offset by 1 if the all the fetched messages are from before the requested offset. (It might also be worth checking that fetchedOffset is less than highWatermark, i.e. that we haven’t reached the end of the partition yet.) I confirmed this by copying KafkaJS locally and hacking consumerGroup.js and batch.js a bit.

“Proper” Fix

To match the Java Consumer, we could:

In the Java Consumer

Useful comment in the Java consumer: https://github.com/apache/kafka/blob/9aa660786e46c1efbf5605a6a69136a1dac6edb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1456-L1460