sarama: producer write broken pipe
Versions
V1.24.1
Sarama | Kafka | Go |
---|---|---|
1.24.1 | Kafka 2.1.1 | 1.13.1 |
Configuration
What configuration values are you using for Sarama and Kafka?
conf := sarama.NewConfig()
conf.Version = sarama.V2_0_0_0
conf.Producer.RequiredAcks = sarama.WaitForLocal
conf.ChannelBufferSize = 1024
conf.Net.KeepAlive = 30 * time.Second
Logs
logs: CLICK ME
time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 state change to [closing] because write tcp xxxxx:59382->xxxx:9092: write: broken pipe\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 state change to [closing] because write tcp xxxx:33336->xxxx:9092: write: broken pipe\n” time=“2019-12-25T09:15:51Z” level=info msg=“Closed connection to broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 state change to [retrying-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 abandoning broker 3\n” time=“2019-12-25T09:15:51Z” level=info msg=“Closed connection to broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 input chan closed\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 shut down\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 state change to [retrying-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 abandoning broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 state change to [retrying-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 abandoning broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 input chan closed\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 shut down\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/metadata fetching metadata for [metric.transfer] from broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/metadata fetching metadata for [msg.republish] from broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/metadata fetching metadata for [metric.hit] from broker xxxx:9092\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/brokers replaced registered broker #1 with xxxx:9092” time=“2019-12-25T09:15:51Z” level=info msg=“ClientID is the default of ‘sarama’, you should consider setting it to something application-specific.” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 starting up\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/3 state change to [open] on metric.transfer/0\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/brokers replaced registered broker #1 with xxxx:9092” time=“2019-12-25T09:15:51Z” level=info msg=“ClientID is the default of ‘sarama’, you should consider setting it to something application-specific.” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 selected broker 3\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 state change to [flushing-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.transfer/0 state change to [normal]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 starting up\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 state change to [open] on metric.ruleengine.hit/0\n” time=“2019-12-25T09:15:51Z” level=info msg=“client/brokers replaced registered broker #1 with xxxx:9092” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 selected broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 selected broker 2\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/broker/2 state change to [open] on msg.republish/1\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 state change to [flushing-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/msg.republish/1 state change to [normal]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 state change to [flushing-1]\n” time=“2019-12-25T09:15:51Z” level=info msg=“producer/leader/metric.hit/0 state change to [normal]\n” time=“2019-12-25T09:15:52Z” level=info msg=“Connected to broker at xxxx:9092 (registered as #2)\n” time=“2019-12-25T09:15:53Z” level=info msg=“Connected to broker at xxxx:9092 (registered as #3)\n”
Problem Description
After 10 minutes, the producer scoket write broken pipe.
About this issue
- Original URL
- State: open
- Created 5 years ago
- Reactions: 3
- Comments: 24 (1 by maintainers)
Commits related to this issue
- Create a new ClusterAdmin at each loop - Fix sarama issue: https://github.com/Shopify/sarama/issues/1565 Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com> — committed to pierDipi/eventing-kafka-broker by pierDipi 4 years ago
- Create a new ClusterAdmin at each loop - Fix sarama issue: https://github.com/Shopify/sarama/issues/1565 Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com> — committed to pierDipi/eventing-kafka-broker by pierDipi 4 years ago
- Create a new ClusterAdmin at each loop (#63) * Create a new ClusterAdmin at each loop - Fix sarama issue: https://github.com/Shopify/sarama/issues/1565 Signed-off-by: Pierangelo Di Pilato <pierange... — committed to knative-extensions/eventing-kafka-broker by pierDipi 4 years ago
Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the
Input
channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won’t realise until it next tries to send a message.~Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation
Interested in this issue. I’m experiencing it with:
Sarama version v1.28.0 Go version 1.15