sarama: Async producer can overflow itself at high rate
Versions
Please specify real version numbers or git SHAs, not just “Latest” since that changes fairly regularly. Sarama Version: b1da1753dedcf77d053613b7eae907b98a2ddad5 Kafka Version: Irrelevant Go Version: b1da1753dedcf77d053613b7eae907b98a2ddad5
Configuration
What configuration values are you using for Sarama and Kafka?
Sarama async producer:
conf := sarama.NewConfig()
conf.Metadata.Retry.Max = 1
conf.Metadata.Retry.Backoff = 250 * time.Millisecond
conf.Producer.RequiredAcks = sarama.RequiredAcks(sarama.WaitForLocal)
conf.Producer.Timeout = 1 * time.Second
conf.Producer.MaxMessageBytes = 16 << 20 // 16MB
conf.Producer.Flush.Bytes = 16 << 20 // 16MB
conf.Producer.Flush.Frequency = time.Minute
conf.Producer.Compression = sarama.CompressionNone // otherwise Kafka goes nuts
conf.Producer.Return.Errors = true
conf.Producer.Partitioner = NewIdentityPartitioner
Logs
Sarama logs:
2017-01-09T23:30:21.504 myhost 2017/01/09 23:30:19 Kafka producer err: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.
Problem Description
Problem is in this function:
We produce messages at such rate (100K/s+) so select always picks up processing of new incoming messages over rolling over and flushing existing ones.
I applied to following patch:
diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index e7ae8c2..13a888b 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -2,6 +2,7 @@ package sarama
import (
"fmt"
+ "log"
"sync"
"time"
@@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
}
if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
+ log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
p.returnError(msg, ErrMessageSizeTooLarge)
continue
}
@@ -577,9 +579,12 @@ func (bp *brokerProducer) run() {
var output chan<- *produceSet
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
+ wasReadyTimes := 0
+
for {
select {
case msg := <-bp.input:
+ log.Println("INPUT MESSAGE")
if msg == nil {
bp.shutdown()
return
@@ -625,14 +630,23 @@ func (bp *brokerProducer) run() {
bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
}
case <-bp.timer:
+ log.Println("TIMER FIRED")
bp.timerFired = true
case output <- bp.buffer:
+ wasReadyTimes = 0
+ log.Println("ROLL OVER")
bp.rollOver()
case response := <-bp.responses:
+ log.Println("HANDLING RESPONSE")
bp.handleResponse(response)
}
if bp.timerFired || bp.buffer.readyToFlush() {
+ log.Println("READY TO FLUSH YAY")
+ wasReadyTimes++
+ if wasReadyTimes > 10 {
+ log.Fatal("We were ready for a long time, but it did not happen. Exiting.")
+ }
output = bp.output
} else {
output = nil
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 9fe5f79..91a127f 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -1,6 +1,9 @@
package sarama
-import "time"
+import (
+ "log"
+ "time"
+)
type partitionSet struct {
msgs []*ProducerMessage
@@ -147,6 +150,7 @@ func (ps *produceSet) readyToFlush() bool {
return true
// If we've passed the byte trigger-point
case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
+ log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
return true
default:
return false
The output looks like this: https://gist.github.com/bobrik/27071d61d5ec98ed15ffd1cb5331f3f4.
Without log.Fatal I’ve seen buffer sizes go as high as 40MB, which is bigger than message.max.bytes=33554432 in our cluster.
The solution is probably to do rollOver outside of select.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Reactions: 1
- Comments: 26 (21 by maintainers)
I’m going to “think aloud”. A few things don’t line up:
MESSAGE_TOO_LARGE.MESSAGE_TOO_LARGEerror go away.1 is inconsistent with 2, since the broker should only return that error when the message we’re sending is larger than the configured maximum.
2 is inconsistent with 3, since the maximum request size and the maximum message size have nothing to do with each other.
Also worth noting: there’s nothing unusual about the config here, and while the message rate is high, Sarama has been benchmarked and stress-tested fairly thoroughly without anybody else reporting this issue.
At this point I can’t even generate a consistent hypothesis which explains all of the facts. So let’s get some more data and see if something pops:
producerMessageOverheadfrom 26 to 34?