sarama: producer write broken pipe

Versions

V1.24.1

Sarama Kafka Go
1.24.1 Kafka 2.1.1 1.13.1
Configuration

What configuration values are you using for Sarama and Kafka?

conf := sarama.NewConfig()
	conf.Version = sarama.V2_0_0_0
	conf.Producer.RequiredAcks = sarama.WaitForLocal
	conf.ChannelBufferSize = 1024
	conf.Net.KeepAlive = 30 * time.Second
Logs
logs: CLICK ME

time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 state change to [closing] because write tcp xxxxx:59382->xxxx:9092: write: broken pipe\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 state change to [closing] because write tcp xxxx:33336->xxxx:9092: write: broken pipe\n” time=“2019-12-25T09:15:51Z” level=info msg=“Closed connection to broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 state change to [retrying-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 abandoning broker 3\n” time=“2019-12-25T09:15:51Z” level=info msg=“Closed connection to broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 input chan closed\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 shut down\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 state change to [retrying-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 abandoning broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 state change to [retrying-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 abandoning broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 input chan closed\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 shut down\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/metadata fetching metadata for [metric.transfer] from broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/metadata fetching metadata for [msg.republish] from broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/metadata fetching metadata for [metric.hit] from broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/brokers replaced registered broker #1 with xxxx:9092” time=“2019-12-25T09:15:51Z” level=info msg=“ClientID is the default of ‘sarama’, you should consider setting it to something application-specific.” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 starting up\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 state change to [open] on metric.transfer/0\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/brokers replaced registered broker #1 with xxxx:9092” time=“2019-12-25T09:15:51Z” level=info msg=“ClientID is the default of ‘sarama’, you should consider setting it to something application-specific.” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 selected broker 3\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 state change to [flushing-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 state change to [normal]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 starting up\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 state change to [open] on metric.ruleengine.hit/0\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/brokers replaced registered broker #1 with xxxx:9092” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 selected broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 selected broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 state change to [open] on msg.republish/1\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 state change to [flushing-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 state change to [normal]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 state change to [flushing-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 state change to [normal]\n” time=“2019-12-25T09:15:52Z” level=info msg=“Connected to broker at xxxx:9092 (registered as #2)\n” time=“2019-12-25T09:15:53Z” level=info msg=“Connected to broker at xxxx:9092 (registered as #3)\n”

Problem Description

After 10 minutes, the producer scoket write broken pipe.

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Reactions: 3
  • Comments: 24 (1 by maintainers)

Commits related to this issue

Most upvoted comments

Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won’t realise until it next tries to send a message.~

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

Interested in this issue. I’m experiencing it with:

Sarama version v1.28.0 Go version 1.15