kafka-go: Unable to compress messages
This is likely user error but I’m not able to compress messages.
I’ve set them to use snappy
with the exact code from README.md (have also tried lz4):
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
CompressionCodec: snappy.NewCompressionCodec(),
})
However, when the messages reach Kafka, they are not compressed:
Code: https://gist.github.com/dkoston/aab79446dc411c9d125a412ea20802c4
Perhaps there’s some broker configuration I need to do? I was under the impression that each message itself contains bits telling the broker what compression codec is used on it’s contents and it’s unnecessary to specify a codec at the broker level if you do so at the producer/writer.
If I add some debug to the client, I see that the messages are running through transform()
so I’d expect them to be compressed with lz4 and marked as such:
$ go run main.go
2018/09/14 15:41:29 Using snappy for message compression
2018/09/14 15:41:29 Writing 10 messages to topic-A
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 [kafka-go]: compressing
2018/09/14 15:41:30 Reading 10 messages from topic-A as group1
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 15:41:42 [kafka-go]: not compressed: 0
2018/09/14 16:03:36 |----------|--------------|---------------|
2018/09/14 16:03:36 | Messages | Uncompressed | Compressed |
2018/09/14 16:03:36 |----------|--------------|---------------|
2018/09/14 16:03:36 | 10 | 10 | 0 |
2018/09/14 16:03:36 |----------|--------------|---------------|
I added:
diff --git a/message.go b/message.go
index 2d8b775..12ed4c2 100644
--- a/message.go
+++ b/message.go
@@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"time"
+ "log"
)
// Message is a data structure representing kafka messages.
@@ -54,9 +55,12 @@ func (msg Message) message() message {
func (msg Message) encode() (Message, error) {
if msg.CompressionCodec == nil {
+ log.Printf("[kafka-go]: not compressing")
return msg, nil
}
+ log.Printf("[kafka-go]: compressing")
+
var err error
msg.Value, err = transform(msg.Value, msg.CompressionCodec.Encode)
return msg, err
@@ -65,6 +69,7 @@ func (msg Message) encode() (Message, error) {
func (msg Message) decode() (Message, error) {
c := msg.message().Attributes & compressionCodecMask
if c == CompressionNoneCode {
+ log.Printf("[kafka-go]: not compressed: %v", c)
return msg, nil
}
@@ -73,6 +78,8 @@ func (msg Message) decode() (Message, error) {
return msg, fmt.Errorf("codec %d not imported.", msg.CompressionCodec)
}
+ log.Printf("[kafka-go]: compressed")
+
var err error
msg.Value, err = transform(msg.Value, codec.Decode)
return msg, err
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 16 (7 by maintainers)
@stevevls Still have this problem, my version is v0.4.21