franz-go: Azure Event Hubs: ProduceSync stuck waiting for WaitGroup
Hi,
Ran into an issue over the past few days that is hard to reproduce, but the outcome is that code is sitting here even after I cancel the context. I am calling ProduceSync from a partition consumer. I have two kgo.Clients: one consumer and one producer and am running 1.14.1. Here are the relevant stack traces:
goroutine profile: total 68
16 @ 0x43baf6 0x406f9d 0x406a98 0xdef686 0x46f4e1
# 0xdef685 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).handlePartitionsLost.func1+0x65 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:144
16 @ 0x43baf6 0x44ce6f 0x44ce46 0x46b307 0x47b8cb 0xcf4d59 0xdf5bd6 0xdf1d39 0xe84ded 0xe85386 0xe847fb 0xe844e5 0xe8408f 0xe83b3c 0xdf291d 0xdf5f05 0xdf0a6f 0xdeee99 0x46f4e1
# 0x46b306 sync.runtime_Semacquire+0x26 /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/sema.go:62
# 0x47b8ca sync.(*WaitGroup).Wait+0x4a /opt/hostedtoolcache/go/1.20.1/x64/src/sync/waitgroup.go:116
# 0xcf4d58 github.com/twmb/franz-go/pkg/kgo.(*Client).ProduceSync+0x158 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/producer.go:250
# 0xdf5bd5 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.producer.Send+0x55 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/producer.go:74
# 0xdf1d38 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.instrumentedProducer.Send+0x158 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/instrumenting.go:37
# 0xe84dec github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.publisher.SendEvent+0x14c /runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/publisher.go:48
# 0xe85385 github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.instrumentedPublisher.SendEvent+0x185 /runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/publisher.go:100
# 0xe847fa github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).doPublish+0x13a /runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:131
# 0xe844e4 github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).processEvent+0x304 /runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:116
# 0xe8408e github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).processEvents+0x12e /runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:86
# 0xe83b3b github.com/XXXXXX/XXXXXX/internal/pkg/dataformatter.(*EventProcessor).OnRecords+0x15b /runner/_work/XXXXXX/XXXXXX/internal/pkg/dataformatter/processor.go:53
# 0xdf291c github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.instrumentedRecordListener.OnRecords+0x3bc /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/instrumenting.go:77
# 0xdf5f04 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.tracingRecordListener.OnRecords+0x164 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/tracing.go:30
# 0xdf0a6e github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*worker).run+0x1ee /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:309
# 0xdeee98 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).handlePartitionsAssigned.func1+0x58 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:88
2 @ 0x43baf6 0x44bd7e 0xc896ae 0x46f4e1
# 0xc896ad github.com/twmb/franz-go/pkg/kgo.(*Client).reapConnectionsLoop+0x16d /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:557
2 @ 0x43baf6 0x44bd7e 0xced338 0x46f4e1
# 0xced337 github.com/twmb/franz-go/pkg/kgo.(*Client).updateMetadataLoop+0x1f7 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/metadata.go:173
1 @ 0x43baf6 0x406f9d 0x406a98 0xd087ce 0x46f4e1
# 0xd087cd github.com/twmb/franz-go/pkg/kgo.(*sink).handleSeqResps+0x2d /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/sink.go:410
1 @ 0x43baf6 0x4343b7 0x4699e9 0x4e2b32 0x4e3f19 0x4e3f07 0x56c569 0x57de05 0x71f7fd 0x50f2d8 0x71f9e5 0x71ced6 0x722dcf 0x722dd0 0x49cd3a 0xc8f297 0xc8f260 0x46f4e1
# 0x4699e8 internal/poll.runtime_pollWait+0x88 /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/netpoll.go:306
# 0x4e2b31 internal/poll.(*pollDesc).wait+0x31 /opt/hostedtoolcache/go/1.20.1/x64/src/internal/poll/fd_poll_runtime.go:84
# 0x4e3f18 internal/poll.(*pollDesc).waitRead+0x298 /opt/hostedtoolcache/go/1.20.1/x64/src/internal/poll/fd_poll_runtime.go:89
# 0x4e3f06 internal/poll.(*FD).Read+0x286 /opt/hostedtoolcache/go/1.20.1/x64/src/internal/poll/fd_unix.go:167
# 0x56c568 net.(*netFD).Read+0x28 /opt/hostedtoolcache/go/1.20.1/x64/src/net/fd_posix.go:55
# 0x57de04 net.(*conn).Read+0x44 /opt/hostedtoolcache/go/1.20.1/x64/src/net/net.go:183
# 0x71f7fc crypto/tls.(*atLeastReader).Read+0x3c /opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:788
# 0x50f2d7 bytes.(*Buffer).ReadFrom+0x97 /opt/hostedtoolcache/go/1.20.1/x64/src/bytes/buffer.go:202
# 0x71f9e4 crypto/tls.(*Conn).readFromUntil+0xe4 /opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:810
# 0x71ced5 crypto/tls.(*Conn).readRecordOrCCS+0x115 /opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:617
# 0x722dce crypto/tls.(*Conn).readRecord+0x16e /opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:583
# 0x722dcf crypto/tls.(*Conn).Read+0x16f /opt/hostedtoolcache/go/1.20.1/x64/src/crypto/tls/conn.go:1316
# 0x49cd39 io.ReadAtLeast+0x99 /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:332
# 0xc8f296 io.ReadFull+0x1d6 /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:351
# 0xc8f25f github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readConn.func2+0x19f /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1126
1 @ 0x43baf6 0x44bd7e 0xc8ed4d 0xc8fc46 0xc91b6d 0xc91918 0x46f4e1
# 0xc8ed4c github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readConn+0x40c /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1142
# 0xc8fc45 github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readResponse+0x85 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1217
# 0xc91b6c github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).handleResp+0xec /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1448
# 0xc91917 github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).handleResps+0x77 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/broker.go:1438
1 @ 0x43baf6 0x44bd7e 0xcc1ceb 0xdef7a5 0xdef79f 0xdf033c 0xdefcd0 0xdee205 0xdee194 0x46f4e1
# 0xcc1cea github.com/twmb/franz-go/pkg/kgo.(*Client).PollRecords+0x2ca /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:511
# 0xdef7a4 github.com/twmb/franz-go/pkg/kgo.(*Client).PollFetches+0x24 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:374
# 0xdef79e github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.clientPollerAdaptor.PollFetches+0x1e /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:182
# 0xdf033b github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).pollFetches+0x15b /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:246
# 0xdefccf github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).doRun+0x8f /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:212
# 0xdee204 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).run+0xa4 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:207
# 0xdee193 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*consumer2).Start.func1.1+0x33 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/consumer2.go:136
1 @ 0x43baf6 0x44bd7e 0xcc9434 0x46f4e1
# 0xcc9433 github.com/twmb/franz-go/pkg/kgo.(*consumerSession).manageFetchConcurrency+0x153 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:1465
1 @ 0x43baf6 0x44bd7e 0xd076c9 0x46f4e1
# 0xd076c8 github.com/twmb/franz-go/pkg/kgo.(*sink).drain+0xe8 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/sink.go:239
1 @ 0x43baf6 0x44ce6f 0x44ce46 0x46b307 0x47b8cb 0xdef23b 0xdef007 0xcd30c4 0xcd34e3 0x46f4e1
# 0x46b306 sync.runtime_Semacquire+0x26 /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/sema.go:62
# 0x47b8ca sync.(*WaitGroup).Wait+0x4a /opt/hostedtoolcache/go/1.20.1/x64/src/sync/waitgroup.go:116
# 0xdef23a github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).handlePartitionsLost+0x15a /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:153
# 0xdef006 github.com/XXXXXX/XXXXXX/internal/pkg/kafka/franz.(*coordinator).onPartitionsLost+0xe6 /runner/_work/XXXXXX/XXXXXX/internal/pkg/kafka/franz/coordinator.go:123
# 0xcd30c3 github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup.func2+0x3c3 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer_group.go:266
# 0xcd34e2 github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage+0x3e2 /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer_group.go:350
1 @ 0x43baf6 0x46b6cc 0x46b6ac 0x477f6c 0xcc2230 0x46f4e1
# 0x46b6ab sync.runtime_notifyListWait+0x12b /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/sema.go:527
# 0x477f6b sync.(*Cond).Wait+0x8b /opt/hostedtoolcache/go/1.20.1/x64/src/sync/cond.go:70
# 0xcc222f github.com/twmb/franz-go/pkg/kgo.(*Client).PollRecords.func2+0xcf /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.14.1/pkg/kgo/consumer.go:500
I am still looking at it from my end, but wanted to see if anything in these stacks stood out to you.
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 25 (11 by maintainers)
The name is not used – it’s the number.