kafka-go: Why write message so slow!
Describe the bug I compare sarama and kafka-go to write msg. kafka-go wait about 2 seconds to finish but sarama finish immediately! why this so slow!
this is stat:
Dials: 1 Writes: 1 Messages: 1 Bytes: 17 Rebalances: 1 Errors: 0 DialTime: {
Avg: 7.452734 ms Min: 7.452734 ms Max: 7.452734 ms
}
WriteTime: {
Avg: 1.017793708 s Min: 1.017793708 s Max: 1.017793708 s
}
WaitTime: {
Avg: 5.342901 ms Min: 5.342901 ms Max: 5.342901 ms
}
this is code:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"127.0.0.1:1024"},
Topic: "test1",
Balancer: &kafka.LeastBytes{},
})
w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
)
fmt.Printf("%+v\n", w.Stats())
w.Close()
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 17 (3 by maintainers)
You can set BatchTimeout in WriterConfig to avoid long time wait:
This is a stupid design. WriteMessages should batch internally instead of us having to batch it manually. Intuition would make it seem like you can call WriteMessages in a loop and kafka-go would batch for you and send them out when it received the batch amount or the timeout. This is how all other libraries I’ve used work.
Instead, we have to create an entire batching system to call WriteMessages which then batches as well? You might as well remove the entire batch system from this library since you have to create one on top of it.
Same issue here, but i think i have figured out the real reason why it so slow.
Here the code snippet In this method:
The
batchTimeout
istime.Second
.So if messages are not large enough, every
WriteMessages
has to wait at least one second!!!@shukyoo That’s why WriteTime is nearly one second:
I am confused by this design too and I run into the 1 second problem too. If this is the point, I’d choose another library.
I think the
WriteMessages()
method does too much. It is also not clear how it is supposed to be used, even when sending messages in chunks.A more sensible approach IMO would be to add new methods as follow:
Building up on the API above, we could add another methods in order to handle timeouts:
I have a working implementation with tests and could contribute if you agree @achille-roussel. @arianitu it would be nice to have your feedback as well.