kafka-go: Unable to compress messages

This is likely user error but I’m not able to compress messages.

I’ve set them to use snappy with the exact code from README.md (have also tried lz4):

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "topic-A",
		CompressionCodec: snappy.NewCompressionCodec(),
	})

However, when the messages reach Kafka, they are not compressed:

screen shot 2018-09-14 at 4 02 59 pm

Code: https://gist.github.com/dkoston/aab79446dc411c9d125a412ea20802c4

Perhaps there’s some broker configuration I need to do? I was under the impression that each message itself contains bits telling the broker what compression codec is used on it’s contents and it’s unnecessary to specify a codec at the broker level if you do so at the producer/writer.

If I add some debug to the client, I see that the messages are running through transform() so I’d expect them to be compressed with lz4 and marked as such:

$ go run main.go
2018/09/14 15:41:29 Using snappy for message compression
2018/09/14 15:41:29 Writing 10 messages to topic-A
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 Reading 10 messages from topic-A as group1
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 16:03:36 |----------|--------------|---------------|
2018/09/14 16:03:36 | Messages | Uncompressed | Compressed    |
2018/09/14 16:03:36 |----------|--------------|---------------|
2018/09/14 16:03:36 |       10 |           10 |             0 |
2018/09/14 16:03:36 |----------|--------------|---------------|

I added:

diff --git a/message.go b/message.go
index 2d8b775..12ed4c2 100644
--- a/message.go
+++ b/message.go
@@ -5,6 +5,7 @@ import (
        "bytes"
        "fmt"
        "time"
+       "log"
 )

 // Message is a data structure representing kafka messages.
@@ -54,9 +55,12 @@ func (msg Message) message() message {

 func (msg Message) encode() (Message, error) {
        if msg.CompressionCodec == nil {
+               log.Printf("[kafka-go]: not compressing")
                return msg, nil
        }

+       log.Printf("[kafka-go]: compressing")
+
        var err error
        msg.Value, err = transform(msg.Value, msg.CompressionCodec.Encode)
        return msg, err
@@ -65,6 +69,7 @@ func (msg Message) encode() (Message, error) {
 func (msg Message) decode() (Message, error) {
        c := msg.message().Attributes & compressionCodecMask
        if c == CompressionNoneCode {
+               log.Printf("[kafka-go]: not compressed: %v", c)
                return msg, nil
        }

@@ -73,6 +78,8 @@ func (msg Message) decode() (Message, error) {
                return msg, fmt.Errorf("codec %d not imported.", msg.CompressionCodec)
        }

+       log.Printf("[kafka-go]: compressed")
+
        var err error
        msg.Value, err = transform(msg.Value, codec.Decode)
        return msg, err

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 16 (7 by maintainers)

Most upvoted comments

I’m going to close this issue now. I don’t believe it to be an issue ever since #135.

@stevevls Still have this problem, my version is v0.4.21