kafka-go: Panic when reading message from start of compacted topic

Describe the bug When trying to read from a compacted topic using the FirstOffset configuration without a consumer group, the library panics with the following stacktrace.

panic: markRead: negative count

goroutine 68 [running]:
github.com/segmentio/kafka-go.(*messageSetReader).markRead(0xc0004587a0)
    external/com_github_segmentio_kafka_go/message_reader.go:345 +0x11a
github.com/segmentio/kafka-go.(*messageSetReader).readMessageV2(0xc0004587a0, 0x2c709, 0xc000507ac8, 0xc000507ab8, 0x2c708, 0x2c708, 0xc000059800, 0xc0005078f8, 0x416b3b, 0xc0005078f8, ...)
    external/com_github_segmentio_kafka_go/message_reader.go:329 +0x49d
github.com/segmentio/kafka-go.(*messageSetReader).readMessage(0xc0004587a0, 0x2c709, 0xc000507ac8, 0xc000507ab8, 0x2c708, 0xc000507a5c, 0x17fd1b29700, 0x0, 0xc0004922a0, 0x0, ...)
    external/com_github_segmentio_kafka_go/message_reader.go:136 +0xc5
github.com/segmentio/kafka-go.(*Batch).readMessage(0xc000195880, 0xc000507ac8, 0xc000507ab8, 0x0, 0x17fd1b29700, 0x100000001, 0xc0004922a0, 0xc0004922a0, 0xc000507ab8, 0x2)
    external/com_github_segmentio_kafka_go/batch.go:240 +0x79
github.com/segmentio/kafka-go.(*Batch).ReadMessage(0xc000195880, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
    external/com_github_segmentio_kafka_go/batch.go:192 +0x11a
github.com/segmentio/kafka-go.(*reader).read(0xc000507ed8, 0xe30160, 0xc0004882c0, 0x2c709, 0xc0000e21e0, 0x0, 0x0, 0x0)
    external/com_github_segmentio_kafka_go/reader.go:1492 +0x3ec
github.com/segmentio/kafka-go.(*reader).run(0xc000507ed8, 0xe30160, 0xc0004882c0, 0x0)
    external/com_github_segmentio_kafka_go/reader.go:1310 +0x2d9
github.com/segmentio/kafka-go.(*Reader).start.func1(0xc0004d8000, 0xe30160, 0xc0004882c0, 0xc00004402c, 0x10, 0x0, 0xfffffffffffffffe, 0xc0004d8138)
    external/com_github_segmentio_kafka_go/reader.go:1211 +0x1d8
created by github.com/segmentio/kafka-go.(*Reader).start
    external/com_github_segmentio_kafka_go/reader.go:1191 +0x1a5

Kafka Version 2.4.0

To Reproduce Sadly I’m unable to reproduce the issue. But maybe you’ve seen the issue already in the past or you can point me to a place what I could check. To be exact I found the application to panic even when restarting it over and over again. My only solution was to truncate the topic, which then brought the consumer back to life. But these messages were not special at all because I’ve exported them and reimported them to a different cluster and couldn’t make the application fail with the other cluster. So it must be (additionally?!) connected to some internal state of the Kafka cluster.

Expected behavior No panic at all should happen.

Additional context Used version of the library is 0.4.30

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Reactions: 8
  • Comments: 15 (5 by maintainers)

Commits related to this issue

Most upvoted comments

@achille-roussel today I was finally able to debug the situation on one of our Kafka clusters. The application is still on version 0.4.30 - so all the line numbers you’ll see, are matching to this specific version: The library started to read a batch of messages via https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L127 The result can be seen here: grafik As you can see the message count in the header is 0. And this is already the issue: Kafka sends around some empty batches and the library is not expecting this - but due to some special situation (see below), this only becomes a problem if two subsequent batches are empty. But lets move on: As you can see in the stacktrace above, I’ve created the screenshot just before the headers we’re read a second time. This is because after the header was read at https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L127 and due to the fact that the count is 0, it will do another read of the next header at https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L249 Unfortunalely (as said before) the next batch is also empty - resulting in this screenshot: grafik At this place it is at least expected to have a batch with one or more messages, and so it tries to read some message data in the next lines of readMessageV2, but in fact it is reading of course another batch header. This may than break easily: Either by reading some data that causes the “header” value of the message to become negative, or it may even get to the point where markRead is called, but as the count is 0, it causes the library to panic. The parsed data just before markRead was called, looked like this for my scenario: grafik

As said before: The wild thing is that due to the readHeader being called twice, the empty batch only becomes a problem if two of them are returned back to back. A single empty batch is “accidentally” working.

Important to note here: I found that empty batches are only available from Kafka message v2 on. You can find the corresponding Kafka part at https://github.com/apache/kafka/blob/a3a4323a5a2ea4ba1820da58a8c31532e8864f0d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L219

So what to do: IMHO first of all the readHeader should be removed from https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L249 - I don’t see a good reason that this method is called a second time. This made sense back then for v1 when the read method is internally doing some loops. But for v2 the outer readHeader is always called first - or do I miss something? Second the readMessage method should do some check around whether the batch is empty for v2 at https://github.com/segmentio/kafka-go/blob/v0.4.30/message_reader.go#L136 - if this is the case, it should be handled in some way.

I think if these points are tackled, it should be possible to fix all of the previously mentioned deserialization issues.

Let me know if you need any more details - I’ve kept all the binary response data in case it is needed once again. Unfortunately I can’t release it here, as it contains some private data, but I hope the given information is already enough to reproduce the situation.

To me it rather looks now like some other issue which just results in this error. Probably at some other place the whole deserialization gets off-track and then ends up with a negative number right there. That would also explain the other deserialization error from the first post.

That’s entirely possible, addressing the panic there seems like the right thing to do tho (we shouldn’t trust data coming from the network), and not crashing the application might give us more opportunities to figure out the conditions that triggered the issue.