sarama: AsyncProducer `Close` blocked for infinity

Versions

Please specify real version numbers or git SHAs, not just “Latest” since that changes fairly regularly.

Sarama Kafka Go
1.27.2 3.0.0 1.7.5
Configuration

What configuration values are you using for Sarama and Kafka?


	config.Metadata.Retry.Max = 3
	config.Metadata.Retry.Backoff = 250 * time.Millisecond
	config.Metadata.Timeout = 1 * time.Minute

	// Admin.Retry take effect on `ClusterAdmin` related operations,
	// only `CreateTopic` for cdc now. Just use default values.
	config.Admin.Retry.Max = 5
	config.Admin.Retry.Backoff = 100 * time.Millisecond
	config.Admin.Timeout = 3 * time.Second

	config.Producer.Retry.Max = 3
	config.Producer.Retry.Backoff = 100 * time.Millisecond

	config.Producer.Partitioner = sarama.NewManualPartitioner
	config.Producer.MaxMessageBytes = c.MaxMessageBytes
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	config.Producer.RequiredAcks = sarama.WaitForAll

Logs

We are testing sarama in an extremely rare scenario:

  • producer can send request to the 1 machine Kafka cluster, but cannot get a response
  • Kill -s STOP the kafka broker process, this will make the TCP connection remain, but will not send a response.
  • Close the producer by call asyncProducer.Close()

we do not have a log from sarama, but have grabbed some goroutine stack, like the following one, it looks blocked on trying to receive a response from the broker. image

Our purpose is that when try to close the producer, it should not be blocked for a long time, instead of return as soon as possible.

WechatIMG3202

But the reality as shown in the picture above, 33 messages failed to deliver after 38minutes, and it was after the process resume by kill -s CONT

Problem Description

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 7
  • Comments: 15 (2 by maintainers)

Most upvoted comments

I believe there are two issues discussed here with similar outcomes.

Default configuration of the AsyncProducer can lead to long delays

By default the AsyncProducer has retries and backoff enabled that can lead to long delays before failing. Such long delays are often a combination of multiple records being buffered and the current in flight request being “stuck” reading from a broker. The default timeout when reading from a TCP socket is 30 seconds (config.Net.ReadTimeout), so it might take up to 30 seconds to notice that the connection to a given broker is broken (assuming the connection was not properly closed on both ends). Then you might see a 100ms backoff per record (if you have 600 pending records going to a given partition, you might be waiting up to 1 minute) before you can replay all those records. Also if the target broker is not reachable anymore, you might also hit another 30 seconds delay (config.Net.DialTimeout) before triggering another retry (up to 3 by default).

So depending on how the AsyncProducer is configured and the type of network error, it might look the producer is stuck but it is actually mostly idle because of the retry logic.

See #1359 for yet another example on how it can take up to 4 minutes to fail trying to connect to a 2 brokers cluster.

expected feature, when try to close the asyncProducer, just drop all buffered message, and response immediately.

Unfortunately because of how the pipeline logic works, I don’t think the shutdown message (created when closing the AsyncProducer) is handled till all the queued records are processed. That is existing retries need to finish (which is often what is taking time) but “new” retries will be cancelled. To fail faster, you could disable retries and handle them yourself:

config.Net.DialTimeout = <short-enough-for-your-need>
config.Net.ReadTimeout = <short-enough-for-your-need>
config.Metadata.Timeout = <short-enough-for-your-need>
config.Producer.Retry.Max = 0
config.Producer.Retry.Backoff = 0

Deadlock on retries (specific Sarama 1.31.0)

The other issue is indeed a deadlock when a brokerProducer is trying to call Close on its broker inside the callback of broker.AsyncProduce, this happens when dealing when receiving failed Produce response while sending concurrently another Produce request. Such callback is called from the responseReceiver goroutine but the Close receiver blocks:

  • if there is an extra Produce request that reaches the maximum number of in flights requests (config.Net.MaxOpenRequests) by trying to acquire the b.lock.
  • till responseReceiver goroutine is done (by reading from b.done).

This is a regression from #2094 and I should have a fix with a simple unit test for that soon. I believe #2129 describes that regression as well and I don’t think it is specific to the SyncProducer.

github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x1400012c310)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:330 +0x8c
github.com/Shopify/sarama.withRecover(0x140019c8360)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:166 +0x270

goroutine 404 [chan receive, 2 minutes]:
github.com/Shopify/sarama.(*asyncProducer).Close(0x1400012c310)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:307 +0x154
github.com/pingcap/tiflow/cdc/sink/producer/kafka.(*kafkaSaramaProducer).Close(0x14000e86dc0)
	github.com/pingcap/tiflow/cdc/sink/producer/kafka/kafka.go:217 +0x11c
github.com/pingcap/tiflow/cdc/sink.(*mqSink).Close(0x140008a4600, {0x103c77048, 0x14005410d00})
	github.com/pingcap/tiflow/cdc/sink/mq.go:309 +0x34
github.com/pingcap/tiflow/cdc/sink.(*Manager).Close(0x140027a7800, {0x103c77048, 0x14005410d00})
	github.com/pingcap/tiflow/cdc/sink/manager.go:92 +0x278
github.com/pingcap/tiflow/cdc/processor.(*processor).Close(0x140023ce000)
	github.com/pingcap/tiflow/cdc/processor/processor.go:1068 +0x774
github.com/pingcap/tiflow/cdc/processor.(*Manager).closeProcessor(0x14000f5e020, {0x14000fda2da, 0x24})
	github.com/pingcap/tiflow/cdc/processor/manager.go:132 +0x68
github.com/pingcap/tiflow/cdc/processor.(*Manager).Tick(0x14000f5e020, {0x12ec19bb0, 0x14000ec0e80}, {0x103c31300, 0x14000a78c80})
	github.com/pingcap/tiflow/cdc/processor/manager.go:112 +0x540
github.com/pingcap/tiflow/pkg/orchestrator.(*EtcdWorker).Run(0x140009fbc00, {0x12ec19bb0, 0x14000ec0e80}, 0x14000effcb0, 0x5f5e100, {0x14001b30020, 0xe}, {0x102db3ae6, 0x9})
	github.com/pingcap/tiflow/pkg/orchestrator/etcd_worker.go:239 +0xa04
github.com/pingcap/tiflow/cdc/capture.(*Capture).runEtcdWorker(0x1400090d2b0, {0x103caae20, 0x14000ec0e80}, {0x103c17400, 0x14000f5e020}, {0x103c31300, 0x14000a78c80}, 0x5f5e100, {0x102db3ae6, 0x9})
	github.com/pingcap/tiflow/cdc/capture/capture.go:456 +0x120
github.com/pingcap/tiflow/cdc/capture.(*Capture).run.func3(0x14000994210, 0x1400090d2b0, {0x103caae20, 0x14000ec0e80}, 0x140006e61c0)
	github.com/pingcap/tiflow/cdc/capture/capture.go:316 +0x2d4
created by github.com/pingcap/tiflow/cdc/capture.(*Capture).run
	github.com/pingcap/tiflow/cdc/capture/capture.go:296 +0x478

goroutine 1785 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).Close.func1()
	github.com/Shopify/sarama@v1.27.2/async_producer.go:300 +0x48
github.com/Shopify/sarama.withRecover(0x140073d4ab0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).Close
	github.com/Shopify/sarama@v1.27.2/async_producer.go:299 +0x9c

goroutine 929 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).retryHandler(0x1400012c310)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:1034 +0x148
github.com/Shopify/sarama.withRecover(0x140019c8370)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:167 +0x2d4

goroutine 670 [chan receive]:
github.com/rcrowley/go-metrics.(*meterArbiter).tick(0x10679ea20)
	github.com/rcrowley/go-metrics@v0.0.0-20200313005456-10cdbea86bc0/meter.go:239 +0x34
created by github.com/rcrowley/go-metrics.NewMeter
	github.com/rcrowley/go-metrics@v0.0.0-20200313005456-10cdbea86bc0/meter.go:46 +0xe4

goroutine 303 [select, 4 minutes]:
github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0x14000c2a090)
	github.com/Shopify/sarama@v1.27.2/client.go:809 +0x104
github.com/Shopify/sarama.withRecover(0x140019c8340)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.NewClient
	github.com/Shopify/sarama@v1.27.2/client.go:180 +0x3f8

goroutine 1784 [semacquire, 12 minutes]:
sync.runtime_Semacquire(0x1400012c350)
	runtime/sema.go:56 +0x38
sync.(*WaitGroup).Wait(0x1400012c348)
	sync/waitgroup.go:130 +0xa4
github.com/Shopify/sarama.(*asyncProducer).shutdown(0x1400012c310)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:1059 +0xd8
github.com/Shopify/sarama.withRecover(0x140073d4aa0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).AsyncClose
	github.com/Shopify/sarama@v1.27.2/async_producer.go:321 +0x80

goroutine 948 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*syncProducer).handleSuccesses(0x140026ae4c8)
	github.com/Shopify/sarama@v1.27.2/sync_producer.go:131 +0x94
github.com/Shopify/sarama.withRecover(0x14001b871c0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.newSyncProducerFromAsyncProducer
	github.com/Shopify/sarama@v1.27.2/sync_producer.go:76 +0xd0

goroutine 947 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*asyncProducer).retryHandler(0x14000555500)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:1034 +0x148
github.com/Shopify/sarama.withRecover(0x14001b871b0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:167 +0x2d4

goroutine 946 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x14000555500)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:330 +0x8c
github.com/Shopify/sarama.withRecover(0x14001b871a0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:166 +0x270

goroutine 945 [select, 4 minutes]:
github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0x14000c2a2d0)
	github.com/Shopify/sarama@v1.27.2/client.go:809 +0x104
github.com/Shopify/sarama.withRecover(0x14001b87180)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.NewClient
	github.com/Shopify/sarama@v1.27.2/client.go:180 +0x3f8

goroutine 949 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*syncProducer).handleErrors(0x140026ae4c8)
	github.com/Shopify/sarama@v1.27.2/sync_producer.go:139 +0x98
github.com/Shopify/sarama.withRecover(0x14001b871d0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.newSyncProducerFromAsyncProducer
	github.com/Shopify/sarama@v1.27.2/sync_producer.go:77 +0x134

goroutine 1125 [select, 2 minutes]:
github.com/Shopify/sarama.(*Broker).sendAndReceive(0x14000d5d180, {0x103c8c220, 0x140021afe30}, {0x103c8c268, 0x140059cbd10})
	github.com/Shopify/sarama@v1.27.2/broker.go:774 +0xf4
github.com/Shopify/sarama.(*Broker).GetMetadata(0x14000d5d180, 0x140021afe30)
	github.com/Shopify/sarama@v1.27.2/broker.go:283 +0x64
github.com/Shopify/sarama.(*client).tryRefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1}, 0x2, {0xc0740bc096c208d8, 0xce70638466, 0x10679f9c0})
	github.com/Shopify/sarama@v1.27.2/client.go:880 +0x450
github.com/Shopify/sarama.(*client).tryRefreshMetadata.func2({0x103c15140, 0x1400087ece0})
	github.com/Shopify/sarama@v1.27.2/client.go:859 +0x1f0
github.com/Shopify/sarama.(*client).tryRefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1}, 0x3, {0xc0740bc096c208d8, 0xce70638466, 0x10679f9c0})
	github.com/Shopify/sarama@v1.27.2/client.go:927 +0xac0
github.com/Shopify/sarama.(*client).RefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1})
	github.com/Shopify/sarama@v1.27.2/client.go:473 +0xfc
github.com/Shopify/sarama.(*partitionProducer).updateLeader.func1()
	github.com/Shopify/sarama@v1.27.2/async_producer.go:657 +0x8c
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x14002c39e68, 0x14002c39eb8)
	github.com/eapache/go-resiliency@v1.2.0/breaker/breaker.go:85 +0x54
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x14002518050, 0x0, 0x14002c39eb8)
	github.com/eapache/go-resiliency@v1.2.0/breaker/breaker.go:86 +0x34
github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...)
	github.com/eapache/go-resiliency@v1.2.0/breaker/breaker.go:55
github.com/Shopify/sarama.(*partitionProducer).updateLeader(0x1400408e060)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:656 +0x74
github.com/Shopify/sarama.(*partitionProducer).dispatch(0x1400408e060)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:589 +0x55c
github.com/Shopify/sarama.withRecover(0x1400557c7c0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:513 +0x200

goroutine 3458 [IO wait, 2 minutes]:
internal/poll.runtime_pollWait(0x12ea13d18, 0x72)
	runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14003396c18, 0x72, 0x0)
	internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14003396c00, {0x140022ed000, 0x1000, 0x1000})
	internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14003396c00, {0x140022ed000, 0x1000, 0x1000})
	net/fd_posix.go:56 +0x44
net.(*conn).Read(0x140027d8c30, {0x140022ed000, 0x1000, 0x1000})
	net/net.go:183 +0x4c
bufio.(*Reader).Read(0x1400387fa40, {0x14000b08128, 0x8, 0x8})
	bufio/bufio.go:227 +0x20c
github.com/Shopify/sarama.(*bufConn).Read(0x14005305d10, {0x14000b08128, 0x8, 0x8})
	github.com/Shopify/sarama@v1.27.2/utils.go:107 +0x44
io.ReadAtLeast({0x12e9cde50, 0x14005305d10}, {0x14000b08128, 0x8, 0x8}, 0x8)
	io/io.go:328 +0xa0
io.ReadFull(...)
	io/io.go:347
github.com/Shopify/sarama.(*Broker).readFull(0x14000d5d180, {0x14000b08128, 0x8, 0x8})
	github.com/Shopify/sarama@v1.27.2/broker.go:702 +0xe0
github.com/Shopify/sarama.(*Broker).responseReceiver(0x14000d5d180)
	github.com/Shopify/sarama@v1.27.2/broker.go:858 +0x148
github.com/Shopify/sarama.withRecover(0x14000fac070)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*Broker).Open.func1
	github.com/Shopify/sarama@v1.27.2/broker.go:211 +0xbac

goroutine 3277 [IO wait, 2 minutes]:
internal/poll.runtime_pollWait(0x12ea141a0, 0x72)
	runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14000742018, 0x72, 0x0)
	internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000742000, {0x1400248f000, 0x1000, 0x1000})
	internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14000742000, {0x1400248f000, 0x1000, 0x1000})
	net/fd_posix.go:56 +0x44
net.(*conn).Read(0x140506829f0, {0x1400248f000, 0x1000, 0x1000})
	net/net.go:183 +0x4c
bufio.(*Reader).Read(0x1400408f1a0, {0x14002d0daf0, 0x8, 0x8})
	bufio/bufio.go:227 +0x20c
github.com/Shopify/sarama.(*bufConn).Read(0x14006ec6528, {0x14002d0daf0, 0x8, 0x8})
	github.com/Shopify/sarama@v1.27.2/utils.go:107 +0x44
io.ReadAtLeast({0x12e9cde50, 0x14006ec6528}, {0x14002d0daf0, 0x8, 0x8}, 0x8)
	io/io.go:328 +0xa0
io.ReadFull(...)
	io/io.go:347
github.com/Shopify/sarama.(*Broker).readFull(0x14000d5c000, {0x14002d0daf0, 0x8, 0x8})
	github.com/Shopify/sarama@v1.27.2/broker.go:702 +0xe0
github.com/Shopify/sarama.(*Broker).responseReceiver(0x14000d5c000)
	github.com/Shopify/sarama@v1.27.2/broker.go:858 +0x148
github.com/Shopify/sarama.withRecover(0x140078cd9e0)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*Broker).Open.func1
	github.com/Shopify/sarama@v1.27.2/broker.go:211 +0xbac

goroutine 1278 [chan send, 2 minutes]:
github.com/Shopify/sarama.(*topicProducer).dispatch(0x14000d2ea80)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:426 +0x154
github.com/Shopify/sarama.withRecover(0x14039381830)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:407 +0x1fc

goroutine 1279 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*partitionProducer).dispatch(0x140025bcd80)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:546 +0x198
github.com/Shopify/sarama.withRecover(0x14039381840)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:513 +0x200

goroutine 1409 [select, 12 minutes]:
github.com/Shopify/sarama.(*brokerProducer).run(0x14003b530a0)
	github.com/Shopify/sarama@v1.27.2/async_producer.go:747 +0x17c
github.com/Shopify/sarama.withRecover(0x14039381860)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:691 +0x25c

goroutine 1410 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1()
	github.com/Shopify/sarama@v1.27.2/async_producer.go:695 +0x68
github.com/Shopify/sarama.withRecover(0x14000e72e00)
	github.com/Shopify/sarama@v1.27.2/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
	github.com/Shopify/sarama@v1.27.2/async_producer.go:694 +0x2e8

We’ve seen this deadlock repeatedly on several Kafka clusters as soon as we picked up v1.31.x, and we haven’t seen it since we rolled out back our vendored deps to v1.30.1, so there is definitely a bug in the code, it’s not a problem due to a dead broker (our brokers are fine).

I think I came across the same issue with Sarama 1.31.0 kafka 1.1.0 with golang 1.17.5 And below is my extracted goroutine stack

goroutine profile: total 334


45 @ 0x4383b6 0x40640c 0x405e78 0xd38906 0xd906de 0x468c21
#	0xd38905	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x1a5	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:546
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


12 @ 0x4383b6 0x448052 0xd3a331 0xd906de 0x468c21
#	0xd3a330	github.com/Shopify/sarama.(*brokerProducer).run+0x190	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:765
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d		/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


9 @ 0x4383b6 0x40640c 0x405e38 0xd8ecb0 0xd9f542 0xf0f2d1 0xf152a6 0xf152b4 0xf27a05 0x468c21
#	0xd8ecaf	github.com/Shopify/sarama.(*syncProducer).SendMessage+0x8f						/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/sync_producer.go:96
#   ..........my project code.......
#   ..........my project code.......
#   ..........my project code.......

9 @ 0x4383b6 0x40640c 0x405e78 0xd42a14 0xd906de 0x468c21
#	0xd42a13	github.com/Shopify/sarama.(*Broker).responseReceiver+0x73	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:1015
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


3 @ 0x4383b6 0x405565 0x40511d 0xd38e58 0xd906de 0x468c21
#	0xd38e57	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x6f7	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:606
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43

3 @ 0x4383b6 0x40640c 0x405e38 0xd3caff 0xd906de 0x468c21
#	0xd3cafe	github.com/Shopify/sarama.(*asyncProducer).retryHandler+0x19e	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:1052
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


3 @ 0x4383b6 0x40640c 0x405e78 0xd37852 0xd906de 0x468c21
#	0xd37851	github.com/Shopify/sarama.(*asyncProducer).dispatcher+0xd1	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:331
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43

3 @ 0x4383b6 0x40640c 0x405e78 0xd8f13c 0xd906de 0x468c21
#	0xd8f13b	github.com/Shopify/sarama.(*syncProducer).handleSuccesses+0x9b	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/sync_producer.go:130
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43

3 @ 0x4383b6 0x40640c 0x405e78 0xd8f2a5 0xd906de 0x468c21
#	0xd8f2a4	github.com/Shopify/sarama.(*syncProducer).handleErrors+0xa4	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/sync_producer.go:138
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


1 @ 0x4383b6 0x405565 0x40511d 0xd3a105 0xd402c3 0xd3e419 0xd42dae 0xd906de 0x468c21
#	0xd3a104	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.2+0xc4	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:701
#	0xd402c2	github.com/Shopify/sarama.(*Broker).AsyncProduce.func1+0xc2			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:388
#	0xd3e418	github.com/Shopify/sarama.(*responsePromise).handle+0x98			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:132
#	0xd42dad	github.com/Shopify/sarama.(*Broker).responseReceiver+0x40d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:1020
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d					/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


1 @ 0x4383b6 0x40640c 0x405e38 0xd3faab 0xd3c6c5 0xd3b20e 0xd3a369 0xd906de 0x468c21
#	0xd3faaa	github.com/Shopify/sarama.(*Broker).Close+0xca			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:269
#	0xd3c6c4	github.com/Shopify/sarama.(*brokerProducer).handleError+0x184	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:1031
#	0xd3b20d	github.com/Shopify/sarama.(*brokerProducer).handleResponse+0x2d	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:898
#	0xd3a368	github.com/Shopify/sarama.(*brokerProducer).run+0x1c8		/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:831
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43



1 @ 0x4383b6 0x40640c 0x405e78 0xd37fd4 0xd906de 0x468c21
#	0xd37fd3	github.com/Shopify/sarama.(*topicProducer).dispatch+0x53	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:413
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


1 @ 0x4383b6 0x44914c 0x449126 0x464cc5 0x472b05 0xd41d37 0xd41d12 0xd401cb 0xd39f1c 0xd906de 0x468c21
#	0x464cc4	sync.runtime_SemacquireMutex+0x24					/usr/local/go/src/runtime/sema.go:71
#	0x472b04	sync.(*Mutex).lockSlow+0x164						/usr/local/go/src/sync/mutex.go:138
#	0xd41d36	sync.(*Mutex).Lock+0x96							/usr/local/go/src/sync/mutex.go:81
#	0xd41d11	github.com/Shopify/sarama.(*Broker).sendWithPromise+0x71		/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:884
#	0xd401ca	github.com/Shopify/sarama.(*Broker).AsyncProduce+0x10a			/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/broker.go:405
#	0xd39f1b	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0xdb	/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/async_producer.go:712
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d				/root/go/pkg/mod/github.com/!shopify/sarama@v1.31.0/utils.go:43


goroutine A  (*Broker).Close+0xca
brokerProducer run() loop
    -> handleError 
          -> bp.broker.Close
               -> bp.broker.lock.Locked, close broker's "responses" channel(buffered channel) ,blocked by broker's "done" channel, and asyncProducer's "responses" channel(zero buffered) could never be closed

goroutine B (*Broker).responseReceiver+0x40d
try pushing broker's dead info into asyncProducer's "responses" channel(zero buffered), blocked, and broker's done channel could never be closed, goroutine A will be blocked forever 



then other application code's goroutine(producers) will not work since bp.broker's lock will never be released


how about the below fix

func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
	var (
		input     = make(chan *ProducerMessage)
		bridge    = make(chan *produceSet)
		responses = make(chan *brokerProducerResponse,1) //buffered channel should work?
	)
       //.......
       //.......
}