kafka-go: Why write message so slow!

Describe the bug I compare sarama and kafka-go to write msg. kafka-go wait about 2 seconds to finish but sarama finish immediately! why this so slow!

this is stat:

	Dials: 1 Writes: 1 Messages: 1 Bytes: 17 Rebalances: 1 Errors: 0 DialTime: {
		Avg: 7.452734 ms Min: 7.452734 ms Max: 7.452734 ms
	}
	WriteTime: {
		Avg: 1.017793708 s Min: 1.017793708 s Max: 1.017793708 s
	}
	WaitTime: {
		Avg: 5.342901 ms Min: 5.342901 ms Max: 5.342901 ms
	}

this is code:

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{"127.0.0.1:1024"},
		Topic:   "test1",
		Balancer: &kafka.LeastBytes{},
	})

	w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
	)

	fmt.Printf("%+v\n", w.Stats())

	w.Close()

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 17 (3 by maintainers)

Most upvoted comments

I made a batch write test, write msg 100 times this is code

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{"127.0.0.1:1024"},
		Topic:   "test1",
		Balancer: &kafka.LeastBytes{},
	})

	start := time.Now()

	for i:=0; i<100; i++ {
		w.WriteMessages(context.Background(),
			kafka.Message{
				Key:   []byte("Key-A"),
				Value: []byte("Hello World!"),
			},
		)
	}
	w.Close()

	end := time.Now()
	diff := end.Sub(start)
	fmt.Println(diff)

it takes: 1m40.3175299s

Is my test right?

(3brokers, 2partitions, 2replicas)

You can set BatchTimeout in WriterConfig to avoid long time wait:

    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers:      []string{"localhost:9092"},
        Topic:        "helloworld",
        Balancer:     &kafka.LeastBytes{},
        BatchTimeout: 10 * time.Millisecond,
    }

This is a stupid design. WriteMessages should batch internally instead of us having to batch it manually. Intuition would make it seem like you can call WriteMessages in a loop and kafka-go would batch for you and send them out when it received the batch amount or the timeout. This is how all other libraries I’ve used work.

Instead, we have to create an entire batching system to call WriteMessages which then batches as well? You might as well remove the entire batch system from this library since you have to create one on top of it.

Same issue here, but i think i have figured out the real reason why it so slow.

Here the code snippet In this method:

func (w *writer) run() {
	defer w.join.Done()

	batchTimer := time.NewTimer(0)
	<-batchTimer.C
	batchTimerRunning := false
	defer batchTimer.Stop()

	var conn *Conn
	var done bool
	var batch = make([]Message, 0, w.batchSize)
	var resch = make([](chan<- error), 0, w.batchSize)
	var lastMsg writerMessage
	var batchSizeBytes int

	defer func() {
		if conn != nil {
			conn.Close()
		}
	}()

	for !done {
		var mustFlush bool
		// lstMsg gets set when the next message would put the maxMessageBytes  over the limit.
		// If a lstMsg exists we need to add it to the batch so we don't lose it.
		if len(lastMsg.msg.Value) != 0 {
			batch = append(batch, lastMsg.msg)
			if lastMsg.res != nil {
				resch = append(resch, lastMsg.res)
			}
			batchSizeBytes += int(lastMsg.msg.message().size())
			lastMsg = writerMessage{}
			if !batchTimerRunning {
				batchTimer.Reset(w.batchTimeout)
				batchTimerRunning = true
			}
		}
		select {
		case wm, ok := <-w.msgs:
			if !ok {
				done, mustFlush = true, true
			} else {
                                // ********** CHECK SIZE ***********
				if int(wm.msg.message().size())+batchSizeBytes > w.maxMessageBytes {
					// If the size of the current message puts us over the maxMessageBytes limit,
					// store the message but don't send it in this batch.
					mustFlush = true
					lastMsg = wm
					break
				}
				batch = append(batch, wm.msg)
				if wm.res != nil {
					resch = append(resch, wm.res)
				}
				batchSizeBytes += int(wm.msg.message().size())
				mustFlush = len(batch) >= w.batchSize || batchSizeBytes >= w.maxMessageBytes
			}
			if !batchTimerRunning {
				batchTimer.Reset(w.batchTimeout)
				batchTimerRunning = true
			}

		case <-batchTimer.C:
			mustFlush = true
			batchTimerRunning = false
		}

                // ******** WAIT TO FLUSH *********
		if mustFlush {
			w.stats.batchSizeBytes.observe(int64(batchSizeBytes))
			if batchTimerRunning {
				if stopped := batchTimer.Stop(); !stopped {
					<-batchTimer.C
				}
				batchTimerRunning = false
			}
			if len(batch) == 0 {
				continue
			}
			var err error
			if conn, err = w.write(conn, batch, resch); err != nil {
				if conn != nil {
					conn.Close()
					conn = nil
				}
			}
			for i := range batch {
				batch[i] = Message{}
			}

			for i := range resch {
				resch[i] = nil
			}
			batch = batch[:0]
			resch = resch[:0]
			batchSizeBytes = 0
		}
	}
}

The batchTimeout is time.Second.

	if config.BatchTimeout == 0 {
		config.BatchTimeout = 1 * time.Second
	}

So if messages are not large enough, every WriteMessages has to wait at least one second!!!

@shukyoo That’s why WriteTime is nearly one second:

Avg: 1.017793708 s Min: 1.017793708 s Max: 1.017793708 s

This is a stupid design. WriteMessages should batch internally instead of us having to batch it manually. Intuition would make it seem like you can call WriteMessages in a loop and kafka-go would batch for you and send them out when it received the batch amount or the timeout. This is how all other libraries I’ve used work.

Instead, we have to create an entire batching system to call WriteMessages which then batches as well? You might as well remove the entire batch system from this library since you have to create one on top of it.

I am confused by this design too and I run into the 1 second problem too. If this is the point, I’d choose another library.

I think the WriteMessages() method does too much. It is also not clear how it is supposed to be used, even when sending messages in chunks.

A more sensible approach IMO would be to add new methods as follow:

// WriteBuffered saves the messages in a buffer, flushing every BufferSize.
// If there's room in the buffer, no message are written. The returned error is the first
// error returned by the Flush method.
func (w *BufferedWriter) WriteBuffered(ctx context.Context, messages ...kafka.Message) error {
	// [...]
}

// Flush sends and then empties the content of the buffer. 
func (w *BufferedWriter) Flush(ctx context.Context) error {
  // [...]
}

Building up on the API above, we could add another methods in order to handle timeouts:

// WriteBufferedAndFlushAfter works as WriteBuffered but also calls a Flush with the same context after duration.
func (w *FlushingWriter) WriteBufferedAndFlushAfter(ctx context.Context, delay time.Duration, messages ...kafka.Message) chan error {
  // [...]
}

I have a working implementation with tests and could contribute if you agree @achille-roussel. @arianitu it would be nice to have your feedback as well.