confluent-kafka-go: Slow Producer and huge memory leak
I am sending messages to Kafka using this code:
deliveryChan := make(chan kafka.Event)
topic:=viper.GetString("kafka.forwardtopic")
kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, deliveryChan)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
}
close(deliveryChan)
However this is extremely slow. Sometimes it takes a second or even 2. I guess it hangs on:
e := <-deliveryChan
Because it is waiting for Kafka acknowledge.
So I tried the same without the channel because I don’t really need Kafka acknowledge:
//deliveryChan := make(chan kafka.Event)
topic:=viper.GetString("kafka.forwardtopic")
kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, nil)
//e := <-deliveryChan
//m := e.(*kafka.Message)
//if m.TopicPartition.Error != nil {
// logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
//}
//close(deliveryChan)
But this creates a huge memory leak and my app crashes after few minutes:
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 55 (27 by maintainers)
In your first example you are effectively implementing a sync producer, it will send one message and wait for an ack (reply) from the broker. This is a complete performance killer since the thruput is limited by the network round-trip-time plus broker processing. It is also made worse by the default batching behaviour of the client, it will buffer messages for up to queue.buffering.max.ms before sending to the broker. This value defaults to 1000ms which seems in-line with what you are seeing.
What you typically do is Produce() and then move on to producing new messages, and you check the delivery results on your deliveryChan(s) through Go-routines or similar. This way thruput is not limited by broker round-trip-time.
In your second example you’ve cut out the return path / delivery chan completely, so this will work at full speed up until the default delivery channel gets full, and that is also the reason why you are seeing a “memory leak”. There is no actual memory leak, but the client is expecting you to read off the default producer.Events channel to serve delivery reports.
If you are completely uninterested in delivery state (which you might want to reconsider) you need to disable the default delivery reports too by setting
go.delivery.reportstofalse. See the documentation here for more info: http://docs.confluent.io/3.1.2/clients/confluent-kafka-go/index.html#hdr-ProducerHi everyone, It seems this issue has been closed for a while, but… I have actually tried the code @edenhill provided and I still have the same memory leak issue (using pprof).
I gave a try to the code from the official documentation, same issue. Anyone had the chance to perform a Produce message without memory leak ? 😦
Thanks in advance
@liubin The problem with these simple sync interfaces is that it tricks new developers into not thinking about what happens when the network starts acting up, broker becoming slow, etc. A sync interface will work okay as long as the latency is low and there are no problems, but as soon as a produce request can’t be immediately served the application will hang, possibly backpressuring its input source. This might be okay in some situations, and it is easy enough to wrap the async produce interface to make it sync (temporary delivery channel + blocking read on that channel after Produce()), but for most situations it is more important to maintain some degree of throughput and not stall the calling application.
There’s some more information here: https://github.com/edenhill/librdkafka/wiki/FAQ#why-is-there-no-sync-produce-interface
And here’s an example how to make a sync produce call:
I don’t think the leak is in my app as sending nil on the Producer channel works fine with no leaks.
I will reduce the queued.max.messages.kbytes and try to check now with the pprof where is the leak coming from.