franz-go: Azure Event Hubs: ProduceSync stuck waiting for WaitGroup

Hi,

Ran into an issue over the past few days that is hard to reproduce, but the outcome is that code is sitting here even after I cancel the context. I am calling ProduceSync from a partition consumer. I have two kgo.Clients: one consumer and one producer and am running 1.14.1. Here are the relevant stack traces:

goroutine profile: total 68
16 @ 0x43baf6 0x406f9d 0x406a98 0xdef686 0x46f4e1
#	0xdef685	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).handlePartitionsLost.func1+0x65	/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:144

16 @ 0x43baf6 0x44ce6f 0x44ce46 0x46b307 0x47b8cb 0xcf4d59 0xdf5bd6 0xdf1d39 0xe84ded 0xe85386 0xe847fb 0xe844e5 0xe8408f 0xe83b3c 0xdf291d 0xdf5f05 0xdf0a6f 0xdeee99 0x46f4e1
#	0x46b306	sync.runtime_Semacquire+0x26											/opt/hostedtoolcache/go/1.20.1/x64/src/runtime/sema.go:62
#	0x47b8ca	sync.(*WaitGroup).Wait+0x4a											/opt/hostedtoolcache/go/1.20.1/x64/src/sync/waitgroup.go:116
#	0xcf4d58	github.com/twmb/franz-go/pkg/kgo.(*Client).ProduceSync+0x158							/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/producer.go:250
#	0xdf5bd5	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.producer.Send+0x55					/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/producer.go:74
#	0xdf1d38	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.instrumentedProducer.Send+0x158			/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/instrumenting.go:37
#	0xe84dec	github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.publisher.SendEvent+0x14c			/runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/publisher.go:48
#	0xe85385	github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.instrumentedPublisher.SendEvent+0x185		/runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/publisher.go:100
#	0xe847fa	github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).doPublish+0x13a		/runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:131
#	0xe844e4	github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).processEvent+0x304		/runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:116
#	0xe8408e	github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).processEvents+0x12e		/runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:86
#	0xe83b3b	github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).OnRecords+0x15b		/runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:53
#	0xdf291c	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.instrumentedRecordListener.OnRecords+0x3bc		/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/instrumenting.go:77
#	0xdf5f04	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.tracingRecordListener.OnRecords+0x164		/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/tracing.go:30
#	0xdf0a6e	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*worker).run+0x1ee				/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:309
#	0xdeee98	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).handlePartitionsAssigned.func1+0x58	/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:88

2 @ 0x43baf6 0x44bd7e 0xc896ae 0x46f4e1
#	0xc896ad	github.com/twmb/franz-go/pkg/kgo.(*Client).reapConnectionsLoop+0x16d	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:557

2 @ 0x43baf6 0x44bd7e 0xced338 0x46f4e1
#	0xced337	github.com/twmb/franz-go/pkg/kgo.(*Client).updateMetadataLoop+0x1f7	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/metadata.go:173

1 @ 0x43baf6 0x406f9d 0x406a98 0xd087ce 0x46f4e1
#	0xd087cd	github.com/twmb/franz-go/pkg/kgo.(*sink).handleSeqResps+0x2d	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/sink.go:410

1 @ 0x43baf6 0x4343b7 0x4699e9 0x4e2b32 0x4e3f19 0x4e3f07 0x56c569 0x57de05 0x71f7fd 0x50f2d8 0x71f9e5 0x71ced6 0x722dcf 0x722dd0 0x49cd3a 0xc8f297 0xc8f260 0x46f4e1
#	0x4699e8	internal/poll.runtime_pollWait+0x88					/opt/hostedtoolcache/go/1.20.1/x64/src/runtime/netpoll.go:306
#	0x4e2b31	internal/poll.(*pollDesc).wait+0x31					/opt/hostedtoolcache/go/1.20.1/x64/src/internal/poll/fd_poll_runtime.go:84
#	0x4e3f18	internal/poll.(*pollDesc).waitRead+0x298				/opt/hostedtoolcache/go/1.20.1/x64/src/internal/poll/fd_poll_runtime.go:89
#	0x4e3f06	internal/poll.(*FD).Read+0x286						/opt/hostedtoolcache/go/1.20.1/x64/src/internal/poll/fd_unix.go:167
#	0x56c568	net.(*netFD).Read+0x28							/opt/hostedtoolcache/go/1.20.1/x64/src/net/fd_posix.go:55
#	0x57de04	net.(*conn).Read+0x44							/opt/hostedtoolcache/go/1.20.1/x64/src/net/net.go:183
#	0x71f7fc	crypto/tls.(*atLeastReader).Read+0x3c					/opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:788
#	0x50f2d7	bytes.(*Buffer).ReadFrom+0x97						/opt/hostedtoolcache/go/1.20.1/x64/src/bytes/buffer.go:202
#	0x71f9e4	crypto/tls.(*Conn).readFromUntil+0xe4					/opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:810
#	0x71ced5	crypto/tls.(*Conn).readRecordOrCCS+0x115				/opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:617
#	0x722dce	crypto/tls.(*Conn).readRecord+0x16e					/opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:583
#	0x722dcf	crypto/tls.(*Conn).Read+0x16f						/opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:1316
#	0x49cd39	io.ReadAtLeast+0x99							/opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:332
#	0xc8f296	io.ReadFull+0x1d6							/opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:351
#	0xc8f25f	github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readConn.func2+0x19f	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1126

1 @ 0x43baf6 0x44bd7e 0xc8ed4d 0xc8fc46 0xc91b6d 0xc91918 0x46f4e1
#	0xc8ed4c	github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readConn+0x40c	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1142
#	0xc8fc45	github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readResponse+0x85	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1217
#	0xc91b6c	github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).handleResp+0xec	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1448
#	0xc91917	github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).handleResps+0x77	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1438

1 @ 0x43baf6 0x44bd7e 0xcc1ceb 0xdef7a5 0xdef79f 0xdf033c 0xdefcd0 0xdee205 0xdee194 0x46f4e1
#	0xcc1cea	github.com/twmb/franz-go/pkg/kgo.(*Client).PollRecords+0x2ca						/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:511
#	0xdef7a4	github.com/twmb/franz-go/pkg/kgo.(*Client).PollFetches+0x24						/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:374
#	0xdef79e	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.clientPollerAdaptor.PollFetches+0x1e	/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:182
#	0xdf033b	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).pollFetches+0x15b		/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:246
#	0xdefccf	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).doRun+0x8f			/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:212
#	0xdee204	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).run+0xa4			/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:207
#	0xdee193	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*consumer2).Start.func1.1+0x33		/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/consumer2.go:136

1 @ 0x43baf6 0x44bd7e 0xcc9434 0x46f4e1
#	0xcc9433	github.com/twmb/franz-go/pkg/kgo.(*consumerSession).manageFetchConcurrency+0x153	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:1465

1 @ 0x43baf6 0x44bd7e 0xd076c9 0x46f4e1
#	0xd076c8	github.com/twmb/franz-go/pkg/kgo.(*sink).drain+0xe8	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/sink.go:239

1 @ 0x43baf6 0x44ce6f 0x44ce46 0x46b307 0x47b8cb 0xdef23b 0xdef007 0xcd30c4 0xcd34e3 0x46f4e1
#	0x46b306	sync.runtime_Semacquire+0x26										/opt/hostedtoolcache/go/1.20.1/x64/src/runtime/sema.go:62
#	0x47b8ca	sync.(*WaitGroup).Wait+0x4a										/opt/hostedtoolcache/go/1.20.1/x64/src/sync/waitgroup.go:116
#	0xdef23a	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).handlePartitionsLost+0x15a	/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:153
#	0xdef006	github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).onPartitionsLost+0xe6	/runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:123
#	0xcd30c3	github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup.func2+0x3c3					/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer_group.go:266
#	0xcd34e2	github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage+0x3e2						/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer_group.go:350

1 @ 0x43baf6 0x46b6cc 0x46b6ac 0x477f6c 0xcc2230 0x46f4e1
#	0x46b6ab	sync.runtime_notifyListWait+0x12b					/opt/hostedtoolcache/go/1.20.1/x64/src/runtime/sema.go:527
#	0x477f6b	sync.(*Cond).Wait+0x8b							/opt/hostedtoolcache/go/1.20.1/x64/src/sync/cond.go:70
#	0xcc222f	github.com/twmb/franz-go/pkg/kgo.(*Client).PollRecords.func2+0xcf	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:500

I am still looking at it from my end, but wanted to see if anything in these stacks stood out to you.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 25 (11 by maintainers)

Most upvoted comments

The name is not used – it’s the number.