sarama: Async producer can overflow itself at high rate

Versions

Please specify real version numbers or git SHAs, not just “Latest” since that changes fairly regularly. Sarama Version: b1da1753dedcf77d053613b7eae907b98a2ddad5 Kafka Version: Irrelevant Go Version: b1da1753dedcf77d053613b7eae907b98a2ddad5

Configuration

What configuration values are you using for Sarama and Kafka?

Sarama async producer:

	conf := sarama.NewConfig()
	conf.Metadata.Retry.Max = 1
	conf.Metadata.Retry.Backoff = 250 * time.Millisecond
	conf.Producer.RequiredAcks = sarama.RequiredAcks(sarama.WaitForLocal)
	conf.Producer.Timeout = 1 * time.Second
	conf.Producer.MaxMessageBytes = 16 << 20 // 16MB
	conf.Producer.Flush.Bytes = 16 << 20 // 16MB
	conf.Producer.Flush.Frequency = time.Minute
	conf.Producer.Compression = sarama.CompressionNone // otherwise Kafka goes nuts
	conf.Producer.Return.Errors = true
	conf.Producer.Partitioner = NewIdentityPartitioner
Logs

Sarama logs:

2017-01-09T23:30:21.504 myhost 2017/01/09 23:30:19 Kafka producer err: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.
Problem Description

Problem is in this function:

We produce messages at such rate (100K/s+) so select always picks up processing of new incoming messages over rolling over and flushing existing ones.

I applied to following patch:

diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index e7ae8c2..13a888b 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"fmt"
+	"log"
 	"sync"
 	"time"
 
@@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
 		}
 
 		if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
+			log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 		}
@@ -577,9 +579,12 @@ func (bp *brokerProducer) run() {
 	var output chan<- *produceSet
 	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
 
+	wasReadyTimes := 0
+
 	for {
 		select {
 		case msg := <-bp.input:
+			log.Println("INPUT MESSAGE")
 			if msg == nil {
 				bp.shutdown()
 				return
@@ -625,14 +630,23 @@ func (bp *brokerProducer) run() {
 				bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
 			}
 		case <-bp.timer:
+			log.Println("TIMER FIRED")
 			bp.timerFired = true
 		case output <- bp.buffer:
+			wasReadyTimes = 0
+			log.Println("ROLL OVER")
 			bp.rollOver()
 		case response := <-bp.responses:
+			log.Println("HANDLING RESPONSE")
 			bp.handleResponse(response)
 		}
 
 		if bp.timerFired || bp.buffer.readyToFlush() {
+			log.Println("READY TO FLUSH YAY")
+			wasReadyTimes++
+			if wasReadyTimes > 10 {
+				log.Fatal("We were ready for a long time, but it did not happen. Exiting.")
+			}
 			output = bp.output
 		} else {
 			output = nil
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 9fe5f79..91a127f 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -1,6 +1,9 @@
 package sarama
 
-import "time"
+import (
+	"log"
+	"time"
+)
 
 type partitionSet struct {
 	msgs        []*ProducerMessage
@@ -147,6 +150,7 @@ func (ps *produceSet) readyToFlush() bool {
 		return true
 	// If we've passed the byte trigger-point
 	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
+		log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
 		return true
 	default:
 		return false

The output looks like this: https://gist.github.com/bobrik/27071d61d5ec98ed15ffd1cb5331f3f4.

Without log.Fatal I’ve seen buffer sizes go as high as 40MB, which is bigger than message.max.bytes=33554432 in our cluster.

The solution is probably to do rollOver outside of select.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 1
  • Comments: 26 (21 by maintainers)

Most upvoted comments

I’m going to “think aloud”. A few things don’t line up:

  1. The largest message we’re actually seeing (per your logs) is ~20KB but the maximum message size is configured to be much larger both on the client and the broker.
  2. The broker is returning MESSAGE_TOO_LARGE.
  3. Limiting the maximum request size (not the maximum message size) makes the MESSAGE_TOO_LARGE error go away.

1 is inconsistent with 2, since the broker should only return that error when the message we’re sending is larger than the configured maximum.

2 is inconsistent with 3, since the maximum request size and the maximum message size have nothing to do with each other.

Also worth noting: there’s nothing unusual about the config here, and while the message rate is high, Sarama has been benchmarked and stress-tested fairly thoroughly without anybody else reporting this issue.


At this point I can’t even generate a consistent hypothesis which explains all of the facts. So let’s get some more data and see if something pops:

  • Is there anything interesting in the broker logs when this occurs? A stack trace, or a log indicating the too-large message, or something?
  • Does anything happen differently if you change producerMessageOverhead from 26 to 34?