sarama: consumer group deadlock on close: consumer group shutdown hangs
Versions
Sarama Version: d84c59b2a2d87f185d91a1cc426a1f4d4e9365109fe0d96cbd2404c3a57c365a / release v1.22.0 Kafka Version: kafka_2.12-2.1.0.jar Go Version: go version go1.12.1 linux/amd64
Configuration
What configuration values are you using for Sarama and Kafka?
Kafka: a single topic with only 1 partition and 2 consumers in a single consumer group
Logs
goroutine 64 [chan receive]:
github.com/Shopify/sarama.(*consumerGroup).Consume(0xc000159ea0, 0x14ed720, 0xc0000c4020, 0xc000436ae0, 0x1, 0x1, 0x14e40a0, 0xc0000ba3c0, 0x0, 0x0)
github.com/Shopify/sarama/consumer_group.go:175 +0x38b
internal/session.(*Service).Start.func1(0xc0000cc230)
created by internal/session.(*Service).Start
goroutine 78 [semacquire]:
sync.runtime_SemacquireMutex(0xc000159ef4, 0x900000000)
/usr/lib/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000159ef0)
/usr/lib/go/src/sync/mutex.go:134 +0x1e2
github.com/Shopify/sarama.(*consumerGroup).Close.func1()
github.com/Shopify/sarama/consumer_group.go:121 +0x94
sync.(*Once).Do(0xc000159f00, 0xc0000dff08)
/usr/lib/go/src/sync/once.go:44 +0xdf
github.com/Shopify/sarama.(*consumerGroup).Close(0xc000159ea0, 0x0, 0x0)
github.com/Shopify/sarama/consumer_group.go:118 +0x7a
internal/session.(*Service).Stop(0xc0000cc230)
goroutine 77 [select]:
github.com/Shopify/sarama.(*consumerGroupSession).heartbeatLoop(0xc000428500)
github.com/Shopify/sarama/consumer_group.go:701 +0x7e0
created by github.com/Shopify/sarama.newConsumerGroupSession
github.com/Shopify/sarama/consumer_group.go:505 +0x41b
goroutine 76 [select]:
github.com/Shopify/sarama.(*offsetManager).mainLoop(0xc0003fdce0)
github.com/Shopify/sarama/offset_manager.go:226 +0x1ef
github.com/Shopify/sarama.withRecover(0xc0001da8f0)
github.com/Shopify/sarama/utils.go:45 +0x51
created by github.com/Shopify/sarama.newOffsetManagerFromClient
github.com/Shopify/sarama/offset_manager.go:70 +0x3ae
Problem Description
Consumer group call to Close() hangs (or deadlocks) on acquiring a mutex, because consume cannot finish when it is waiting for a session to complete but there were no particions assigned to this consumer instance.
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Reactions: 8
- Comments: 20 (1 by maintainers)
@PapayaJuice yes, but the gods of concurrent programming will not praise me. The main idea is to ignore happening data race in a bad case.
The main idea behind that code is that it spawns a goroutine which tries to close a consumer group. If it deadlocks, the client which is owned by the CS will be closed forcibly in a goroutine which called
CloseConsumerGroup. In a good case, the close of the CS client has a happens-before relationship set by the waiting inCloseConsumerGroup. In the bad case, it will be closed with a data race. But whatever, thecs.Close()would not block an execution thread and shutdown will continue.The wait channels in arguments are required for this function to wait for finishing of the goroutine which does the consumer group session loop. When cs.Consume() finishes this goroutine may have a desire to do some cleanup.